This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 84896a98ebe [HUDI-9386] Deprecate startCommitWithTime(instantTime,
actionType) (#13261)
84896a98ebe is described below
commit 84896a98ebe88675951147fe3a6a7ef53692eedb
Author: Tim Brown <[email protected]>
AuthorDate: Tue May 20 18:52:06 2025 -0500
[HUDI-9386] Deprecate startCommitWithTime(instantTime, actionType) (#13261)
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 29 +++++++++++----
.../metadata/HoodieBackedTableMetadataWriter.java | 2 +-
.../hudi/client/TestBaseHoodieWriteClient.java | 8 ++--
.../apache/hudi/client/WriteClientTestUtils.java | 30 +++++++++++++++
.../FlinkHoodieBackedTableMetadataWriter.java | 2 +-
.../BucketIndexBulkInsertPartitionerWithRows.java | 19 ++++------
.../SparkHoodieBackedTableMetadataWriter.java | 2 +-
...ieBackedTableMetadataWriterTableVersionSix.java | 2 +-
.../TestHoodieClientOnCopyOnWriteStorage.java | 5 ++-
.../rollback/HoodieClientRollbackTestBase.java | 3 +-
...dieSparkCopyOnWriteTableArchiveWithReplace.java | 3 +-
.../examples/spark/HoodieWriteClientExample.java | 3 +-
.../testsuite/HoodieDeltaStreamerWrapper.java | 4 +-
.../BaseDatasetBulkInsertCommitActionExecutor.java | 17 +++++----
.../DatasetBucketRescaleCommitActionExecutor.java | 24 ++++++------
.../DatasetBulkInsertCommitActionExecutor.java | 8 ++--
...setBulkInsertOverwriteCommitActionExecutor.java | 5 +--
...lkInsertOverwriteTableCommitActionExecutor.java | 5 +--
...eamerDatasetBulkInsertCommitActionExecutor.java | 10 +----
.../internal/DataSourceInternalWriterHelper.java | 4 +-
.../HoodieDataSourceInternalBatchWrite.java | 4 +-
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 19 ++++------
.../AlterHoodieTableAddColumnsCommand.scala | 4 +-
.../spark/sql/hudi/command/AlterTableCommand.scala | 3 +-
.../org/apache/hudi/TestMetadataTableSupport.java | 3 +-
.../hudi/client/TestHoodieClientMultiWriter.java | 15 ++++----
.../TestHoodieDataSourceInternalBatchWrite.java | 43 +++++++++++++---------
.../table/action/compact/TestAsyncCompaction.java | 2 +-
.../hudi/functional/TestRecordLevelIndex.scala | 3 +-
.../apache/spark/sql/hudi/ddl/TestAlterTable.scala | 3 +-
.../sql/hudi/procedure/TestTTLProcedure.scala | 4 +-
.../hudi/utilities/HoodieDropPartitionsTool.java | 20 ++--------
.../apache/hudi/utilities/streamer/StreamSync.java | 41 ++++++++-------------
.../deltastreamer/TestHoodieDeltaStreamer.java | 2 +-
34 files changed, 178 insertions(+), 173 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index afc0338f3e0..3d162d90a95 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -941,11 +941,19 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
return startCommit(metaClient.getCommitActionType(), metaClient);
}
+ /**
+ * Provides a new commit time for the provided action.
+ */
+ public String startCommit(String actionType) {
+ HoodieTableMetaClient metaClient = createMetaClient(true);
+ return startCommit(actionType, metaClient);
+ }
+
/**
* Provides a new commit time for a write operation
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified
action.
*/
public String startCommit(String actionType, HoodieTableMetaClient
metaClient) {
- return startCommitWithTime(Option.empty(), actionType, metaClient);
+ return startCommit(Option.empty(), actionType, metaClient);
}
/**
@@ -954,21 +962,28 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
*/
public void startCommitWithTime(String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
- startCommitWithTime(Option.of(instantTime),
metaClient.getCommitActionType(), metaClient);
+ startCommit(Option.of(instantTime), metaClient.getCommitActionType(),
metaClient);
}
/**
- * Completes a new commit time for a write operation
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified
action.
+ * Starts a new commit time for a write operation against the metadata table
with the provided instant and action type.
*/
- public void startCommitWithTime(String instantTime, String actionType) {
- HoodieTableMetaClient metaClient = createMetaClient(true);
- startCommitWithTime(Option.of(instantTime), actionType, metaClient);
+ public void startCommitForMetadataTable(HoodieTableMetaClient
metadataMetaClient, String instantTime, String actionType) {
+ ValidationUtils.checkArgument(metadataMetaClient.isMetadataTable(),
"Attempting to create an instant with a predetermined time on a non-metadata
table.");
+ startCommit(Option.of(instantTime), actionType, metadataMetaClient);
}
/**
* Starts a new commit time for a write operation (insert/update/delete)
with specified action.
+ *
+ * @param providedInstantTime an optional argument that should only be
provided for writes to the metadata table or for testing purposes.
+ * If not provided, a new instant time will be
generated and returned to the caller.
+ * @param actionType the type of commit
+ * @param metaClient a meta client for the table
+ * @return the requested instant time for the commit that was started
*/
- private String startCommitWithTime(Option<String> providedInstantTime,
String actionType, HoodieTableMetaClient metaClient) {
+ @VisibleForTesting
+ String startCommit(Option<String> providedInstantTime, String actionType,
HoodieTableMetaClient metaClient) {
if (needsUpgrade(metaClient)) {
// unclear what instant to use, since upgrade does have a given instant.
executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient,
Option.empty()));
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 373ec2d752a..10e5f736e5a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1403,7 +1403,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
metadataMetaClient.reloadActiveTimeline();
}
- writeClient.startCommitWithTime(instantTime);
+ writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime,
HoodieActiveTimeline.DELTA_COMMIT_ACTION);
preWrite(instantTime);
if (isInitializing) {
engineContext.setJobStatus(this.getClass().getSimpleName(),
String.format("Bulk inserting at %s into metadata table %s", instantTime,
metadataWriteConfig.getTableName()));
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
index 7c608cfa9b4..75dd7c0cc50 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -36,7 +36,6 @@ import
org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieLockConfig;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -135,14 +134,13 @@ class TestBaseHoodieWriteClient extends
HoodieCommonTestHarness {
BaseHoodieTableServiceClient<String, String, String> tableServiceClient =
mock(BaseHoodieTableServiceClient.class);
TestWriteClient writeClient = new TestWriteClient(writeConfig, table,
Option.empty(), tableServiceClient, transactionManager, timeGenerator);
- writeClient.startCommit();
+ String instantTime = writeClient.startCommit("commit");
HoodieTimeline writeTimeline =
metaClient.getActiveTimeline().getWriteTimeline();
assertTrue(writeTimeline.lastInstant().isPresent());
assertEquals("commit", writeTimeline.lastInstant().get().getAction());
- String commitTime =
HoodieTestDataGenerator.getCommitTimeAtUTC(now.getEpochSecond());
- assertEquals(commitTime,
writeTimeline.lastInstant().get().requestedTime());
- HoodieInstant expectedInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieActiveTimeline.COMMIT_ACTION, commitTime,
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+ assertEquals(instantTime,
writeTimeline.lastInstant().get().requestedTime());
+ HoodieInstant expectedInstant = new
HoodieInstant(HoodieInstant.State.REQUESTED,
HoodieActiveTimeline.COMMIT_ACTION, instantTime,
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
InOrder inOrder = Mockito.inOrder(transactionManager, timeGenerator);
inOrder.verify(transactionManager).beginTransaction(Option.empty(),
Option.empty());
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/WriteClientTestUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/WriteClientTestUtils.java
new file mode 100644
index 00000000000..0a9aa24b571
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/WriteClientTestUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.util.Option;
+
+public class WriteClientTestUtils {
+ private WriteClientTestUtils() {
+ }
+
+ public static void startCommitWithTime(BaseHoodieWriteClient<?, ?, ?, ?>
writeClient, String instantTime, String actionType) {
+ writeClient.startCommit(Option.of(instantTime), actionType,
writeClient.createMetaClient(false));
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index a88a8fa1073..1d67f62cceb 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -159,7 +159,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
metadataMetaClient.reloadActiveTimeline();
}
- writeClient.startCommitWithTime(instantTime);
+ writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime,
HoodieActiveTimeline.DELTA_COMMIT_ACTION);
preWrite(instantTime);
List<WriteStatus> statuses = isInitializing
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
index 9816febb041..520427dbd45 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -18,7 +18,6 @@
package org.apache.hudi.execution.bulkinsert;
-import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
@@ -40,18 +39,16 @@ public class BucketIndexBulkInsertPartitionerWithRows
implements BulkInsertParti
private FileSystemViewStorageConfig viewConfig;
public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields,
HoodieWriteConfig writeConfig) {
- this.indexKeyFields = indexKeyFields;
- this.numBucketsFunction = NumBucketsFunction.fromWriteConfig(writeConfig);
- this.writeConfig = writeConfig;
- if (writeConfig.isUsingRemotePartitioner()) {
- this.viewConfig = writeConfig.getViewStorageConfig();
- }
+ this(writeConfig, NumBucketsFunction.fromWriteConfig(writeConfig),
indexKeyFields);
+ }
+
+ public BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig
writeConfig, String expressions, String rule, int bucketNumber) {
+ this(writeConfig, new NumBucketsFunction(expressions, rule, bucketNumber),
writeConfig.getBucketIndexHashFieldWithDefault());
}
- public BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig
writeConfig, PartitionBucketIndexHashingConfig hashingConfig) {
- this.indexKeyFields = writeConfig.getBucketIndexHashFieldWithDefault();
- this.numBucketsFunction = new
NumBucketsFunction(hashingConfig.getExpressions(),
- hashingConfig.getRule(), hashingConfig.getDefaultBucketNumber());
+ private BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig
writeConfig, NumBucketsFunction numBucketsFunction, String indexKeyFields) {
+ this.indexKeyFields = indexKeyFields;
+ this.numBucketsFunction = numBucketsFunction;
this.writeConfig = writeConfig;
if (writeConfig.isUsingRemotePartitioner()) {
this.viewConfig = writeConfig.getViewStorageConfig();
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 875ebd858ad..396ab22115f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -156,7 +156,7 @@ public class SparkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
String actionType =
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION,
HoodieTableType.MERGE_ON_READ);
- writeClient.startCommitWithTime(instantTime, actionType);
+ writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime,
actionType);
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index 81df778d2dd..eadabf63b7f 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -132,7 +132,7 @@ public class
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
String actionType =
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION,
HoodieTableType.MERGE_ON_READ);
- writeClient.startCommitWithTime(instantTime, actionType);
+ writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime,
actionType);
writeClient.deletePartitions(partitionsToDrop, instantTime);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 80c40158c0e..56d8ca9581d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
import
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
import
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
@@ -1264,7 +1265,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
// Do Insert Overwrite
String commitTime2 = "002";
- client.startCommitWithTime(commitTime2, REPLACE_COMMIT_ACTION);
+ WriteClientTestUtils.startCommitWithTime(writeClient, commitTime2,
REPLACE_COMMIT_ACTION);
List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2,
batch2RecordsCount);
List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>(inserts2);
JavaRDD<HoodieRecord> insertAndUpdatesRDD2 =
jsc.parallelize(insertsAndUpdates2, 2);
@@ -1319,7 +1320,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
}
private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client,
String commitTime, List<String> deletePartitionPath) {
- client.startCommitWithTime(commitTime, REPLACE_COMMIT_ACTION);
+ WriteClientTestUtils.startCommitWithTime(client, commitTime,
REPLACE_COMMIT_ACTION);
HoodieWriteResult writeResult =
client.deletePartitions(deletePartitionPath, commitTime);
Set<String> deletePartitionReplaceFileIds =
writeResult.getPartitionToReplaceFileIds().entrySet()
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
index 0aac5b948de..0d35eae5bb1 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.rollback;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieFileGroup;
@@ -143,7 +144,7 @@ public class HoodieClientRollbackTestBase extends
HoodieClientTestBase {
newCommitTime = "002";
records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
writeRecords = jsc.parallelize(records, 1);
- client.startCommitWithTime(newCommitTime, commitActionType);
+ WriteClientTestUtils.startCommitWithTime(client, newCommitTime,
commitActionType);
HoodieWriteResult result = client.insertOverwrite(writeRecords,
newCommitTime);
statuses = result.getWriteStatuses();
Assertions.assertNoWriteErrors(statuses.collect());
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
index 702cb67f66a..8a3ba8c4b6e 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -77,8 +77,7 @@ public class
TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
assertEquals(21, countRecordsOptionallySince(jsc(), basePath(),
sqlContext(), timeline1, Option.empty()));
// delete the 1st and the 2nd partition; 1 replace commit
- final String instantTime4 = client.createNewInstantTime();
- client.startCommitWithTime(instantTime4,
HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+ final String instantTime4 =
client.startCommit(HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH,
DEFAULT_SECOND_PARTITION_PATH), instantTime4);
// 2nd write batch; 6 commits for the 4th partition; the 6th commit to
trigger archiving the replace commit
diff --git
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index 457020036e7..745107d6a8b 100644
---
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -129,8 +129,7 @@ public class HoodieWriteClientExample {
client.delete(deleteRecords, newCommitTime);
// Delete by partition
- newCommitTime = client.startCommit();
- client.startCommitWithTime(newCommitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
+ newCommitTime =
client.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION);
LOG.info("Starting commit " + newCommitTime);
// The partition where the data needs to be deleted
List<String> partitionList = toBeDeleted.stream().map(s ->
s.getPartitionPath()).distinct().collect(Collectors.toList());
diff --git
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 59b51bcb53c..73080621efa 100644
---
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -86,8 +85,7 @@ public class HoodieDeltaStreamerWrapper extends
HoodieDeltaStreamer {
.setConf(service.getStorage().getConf().newInstance())
.setBasePath(service.getCfg().targetBasePath)
.build();
- String instantTime = InProcessTimeGenerator.createNewInstantTime();
- InputBatch inputBatch = service.readFromSource(instantTime,
metaClient).getLeft();
+ InputBatch inputBatch = service.readFromSource(metaClient).getLeft();
return Pair.of(inputBatch.getSchemaProvider(),
Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD<HoodieRecord>)
inputBatch.getBatch().get()));
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index d4929c254a6..acdc139cf7c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -56,20 +56,19 @@ public abstract class
BaseDatasetBulkInsertCommitActionExecutor implements Seria
protected final transient HoodieWriteConfig writeConfig;
protected final transient SparkRDDWriteClient writeClient;
- protected final String instantTime;
+ protected String instantTime;
protected HoodieTable table;
public BaseDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config,
- SparkRDDWriteClient
writeClient,
- String instantTime) {
+ SparkRDDWriteClient
writeClient) {
this.writeConfig = config;
this.writeClient = writeClient;
- this.instantTime = instantTime;
}
protected void preExecute() {
+ instantTime = writeClient.startCommit(getCommitActionType());
+ table = writeClient.initTable(getWriteOperationType(),
Option.ofNullable(instantTime));
table.validateInsertSchema();
- writeClient.startCommitWithTime(instantTime, getCommitActionType());
writeClient.preWrite(instantTime, getWriteOperationType(),
table.getMetaClient());
}
@@ -97,13 +96,11 @@ public abstract class
BaseDatasetBulkInsertCommitActionExecutor implements Seria
}
boolean populateMetaFields =
writeConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS);
-
- table = writeClient.initTable(getWriteOperationType(),
Option.ofNullable(instantTime));
+ preExecute();
BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows =
getPartitioner(populateMetaFields, isTablePartitioned);
Dataset<Row> hoodieDF =
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig,
bulkInsertPartitionerRows, instantTime);
- preExecute();
HoodieWriteMetadata<JavaRDD<WriteStatus>> result =
buildHoodieWriteMetadata(doExecute(hoodieDF,
bulkInsertPartitionerRows.arePartitionRecordsSorted()));
afterExecute(result);
@@ -138,4 +135,8 @@ public abstract class
BaseDatasetBulkInsertCommitActionExecutor implements Seria
protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
return Collections.emptyMap();
}
+
+ public String getInstantTime() {
+ return instantTime;
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
index 449283d872c..a5bf6013c52 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
@@ -41,17 +41,16 @@ public class DatasetBucketRescaleCommitActionExecutor
extends DatasetBulkInsertO
private static final long serialVersionUID = 1L;
private static final Logger LOG =
LoggerFactory.getLogger(DatasetBucketRescaleCommitActionExecutor.class);
- private final PartitionBucketIndexHashingConfig hashingConfig;
+ private final String expression;
+ private final String rule;
+ private final int bucketNumber;
public DatasetBucketRescaleCommitActionExecutor(HoodieWriteConfig config,
- SparkRDDWriteClient
writeClient,
- String instantTime) {
- super(config, writeClient, instantTime);
- String expression = config.getBucketIndexPartitionExpression();
- String rule = config.getBucketIndexPartitionRuleType();
- int bucketNumber = config.getBucketIndexNumBuckets();
- this.hashingConfig = new PartitionBucketIndexHashingConfig(expression,
- bucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION,
instantTime);
+ SparkRDDWriteClient
writeClient) {
+ super(config, writeClient);
+ expression = config.getBucketIndexPartitionExpression();
+ rule = config.getBucketIndexPartitionRuleType();
+ bucketNumber = config.getBucketIndexNumBuckets();
}
/**
@@ -59,19 +58,20 @@ public class DatasetBucketRescaleCommitActionExecutor
extends DatasetBulkInsertO
*/
@Override
protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean
populateMetaFields, boolean isTablePartitioned) {
- return new
BucketIndexBulkInsertPartitionerWithRows(writeClient.getConfig(),
hashingConfig);
+ return new
BucketIndexBulkInsertPartitionerWithRows(writeClient.getConfig(), expression,
rule, bucketNumber);
}
/**
* create new hashing_config during afterExecute and before commit finished.
- * @param result
*/
@Override
protected void preExecute() {
super.preExecute();
+ PartitionBucketIndexHashingConfig hashingConfig = new
PartitionBucketIndexHashingConfig(expression,
+ bucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION,
instantTime);
boolean res =
PartitionBucketIndexHashingConfig.saveHashingConfig(hashingConfig,
table.getMetaClient());
ValidationUtils.checkArgument(res);
- LOG.info("Finish to save hashing config " + hashingConfig);
+ LOG.info("Finish to save hashing config {}", hashingConfig);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
index ea78a39966d..62d89b2c034 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
@@ -43,14 +43,14 @@ import java.util.stream.Collectors;
public class DatasetBulkInsertCommitActionExecutor extends
BaseDatasetBulkInsertCommitActionExecutor {
public DatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config,
- SparkRDDWriteClient writeClient,
- String instantTime) {
- super(config, writeClient, instantTime);
+ SparkRDDWriteClient
writeClient) {
+ super(config, writeClient);
}
@Override
protected void preExecute() {
- // no op
+ instantTime = writeClient.startCommit();
+ table = writeClient.initTable(getWriteOperationType(),
Option.ofNullable(instantTime));
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
index 324a52e915a..9780e90cf92 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
@@ -43,9 +43,8 @@ import java.util.stream.Collectors;
public class DatasetBulkInsertOverwriteCommitActionExecutor extends
BaseDatasetBulkInsertCommitActionExecutor {
public DatasetBulkInsertOverwriteCommitActionExecutor(HoodieWriteConfig
config,
- SparkRDDWriteClient
writeClient,
- String instantTime) {
- super(config, writeClient, instantTime);
+ SparkRDDWriteClient
writeClient) {
+ super(config, writeClient);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
index d0e5bf99b49..772e388f911 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
@@ -35,9 +35,8 @@ import java.util.Map;
public class DatasetBulkInsertOverwriteTableCommitActionExecutor extends
DatasetBulkInsertOverwriteCommitActionExecutor {
public DatasetBulkInsertOverwriteTableCommitActionExecutor(HoodieWriteConfig
config,
-
SparkRDDWriteClient writeClient,
- String
instantTime) {
- super(config, writeClient, instantTime);
+
SparkRDDWriteClient writeClient) {
+ super(config, writeClient);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
index fb5a501714c..f5f5d47e6cc 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
@@ -36,14 +36,8 @@ import org.apache.spark.sql.Row;
*/
public class HoodieStreamerDatasetBulkInsertCommitActionExecutor extends
BaseDatasetBulkInsertCommitActionExecutor {
- public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig
config, SparkRDDWriteClient writeClient, String instantTime) {
- super(config, writeClient, instantTime);
- }
-
- @Override
- protected void preExecute() {
- table.validateInsertSchema();
- writeClient.preWrite(instantTime, getWriteOperationType(),
table.getMetaClient());
+ public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig
config, SparkRDDWriteClient writeClient) {
+ super(config, writeClient);
}
@Override
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index 931f700cf65..9d47430762a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -64,7 +64,6 @@ public class DataSourceInternalWriterHelper {
this.extraMetadata = extraMetadata;
this.writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())),
writeConfig);
this.writeClient.setOperationType(operationType);
- this.writeClient.startCommitWithTime(instantTime);
this.hoodieTable = this.writeClient.initTable(operationType,
Option.of(instantTime));
this.metaClient = HoodieTableMetaClient.builder()
@@ -98,10 +97,11 @@ public class DataSourceInternalWriterHelper {
writeClient.close();
}
- public void createInflightCommit() {
+ public String createInflightCommit() {
metaClient.getActiveTimeline().transitionRequestedToInflight(
metaClient.createNewInstant(State.REQUESTED,
CommitUtils.getCommitActionType(operationType,
metaClient.getTableType()), instantTime), Option.empty());
+ return instantTime;
}
public HoodieTable getHoodieTable() {
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
index 7409db99730..551cc340ed5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
@@ -44,7 +44,6 @@ import java.util.stream.Collectors;
*/
public class HoodieDataSourceInternalBatchWrite implements BatchWrite {
- private final String instantTime;
private final HoodieWriteConfig writeConfig;
private final StructType structType;
private final boolean arePartitionRecordsSorted;
@@ -54,7 +53,6 @@ public class HoodieDataSourceInternalBatchWrite implements
BatchWrite {
public HoodieDataSourceInternalBatchWrite(String instantTime,
HoodieWriteConfig writeConfig, StructType structType,
SparkSession jss,
StorageConfiguration<?> storageConf, Map<String, String> properties, boolean
populateMetaFields, boolean arePartitionRecordsSorted) {
- this.instantTime = instantTime;
this.writeConfig = writeConfig;
this.structType = structType;
this.populateMetaFields = populateMetaFields;
@@ -66,7 +64,7 @@ public class HoodieDataSourceInternalBatchWrite implements
BatchWrite {
@Override
public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
- dataSourceInternalWriterHelper.createInflightCommit();
+ String instantTime = dataSourceInternalWriterHelper.createInflightCommit();
if (WriteOperationType.BULK_INSERT ==
dataSourceInternalWriterHelper.getWriteOperationType()) {
return new
HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
writeConfig, instantTime, structType, populateMetaFields,
arePartitionRecordsSorted);
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 0e3f4477586..b058997438a 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -413,9 +413,8 @@ class HoodieSparkSqlWriterInternal {
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
}
- instantTime = client.createNewInstantTime()
// Issue deletes
- client.startCommitWithTime(instantTime, commitActionType)
+ instantTime = client.startCommit(commitActionType)
val writeStatuses = DataSourceUtils.doDeleteOperation(client,
hoodieKeysAndLocationsToDelete, instantTime, preppedSparkSqlWrites ||
preppedWriteOperation)
(writeStatuses, client)
@@ -445,8 +444,7 @@ class HoodieSparkSqlWriterInternal {
(parameters -
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
.asInstanceOf[SparkRDDWriteClient[_]]
// Issue delete partitions
- instantTime = client.createNewInstantTime()
- client.startCommitWithTime(instantTime, commitActionType)
+ instantTime = client.startCommit(commitActionType)
val writeStatuses =
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete,
instantTime)
(writeStatuses, client)
@@ -508,7 +506,7 @@ class HoodieSparkSqlWriterInternal {
if (writeConfig.getRecordMerger.getRecordType ==
HoodieRecordType.SPARK && tableType == MERGE_ON_READ &&
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) !=
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
throw new
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName}
only support parquet log.")
}
- instantTime = client.createNewInstantTime()
+ instantTime = client.startCommit(commitActionType)
// Convert to RDD[HoodieRecord]
val hoodieRecords =
Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
HoodieCreateRecordUtils.createHoodieRecordRddArgs(df,
writeConfig, parameters, avroRecordName,
@@ -520,7 +518,6 @@ class HoodieSparkSqlWriterInternal {
// Remove duplicates from incoming records based on existing keys
from storage.
val dedupedHoodieRecords = handleInsertDuplicates(hoodieRecords,
hoodieConfig, operation, jsc, parameters)
- client.startCommitWithTime(instantTime, commitActionType)
try {
val writeResult = DataSourceUtils.doWriteOperation(client,
dedupedHoodieRecords, instantTime, operation,
preppedSparkSqlWrites || preppedWriteOperation)
@@ -825,23 +822,23 @@ class HoodieSparkSqlWriterInternal {
val overwriteOperationType =
Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE))
.map(WriteOperationType.fromValue)
.orNull
- val instantTime = writeClient.createNewInstantTime()
val executor = mode match {
case _ if overwriteOperationType == null =>
// Don't need to overwrite
- new DatasetBulkInsertCommitActionExecutor(writeConfig, writeClient,
instantTime)
+ new DatasetBulkInsertCommitActionExecutor(writeConfig, writeClient)
case SaveMode.Append if overwriteOperationType ==
WriteOperationType.INSERT_OVERWRITE =>
// INSERT OVERWRITE PARTITION uses Append mode
- new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig,
writeClient, instantTime)
+ new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig,
writeClient)
case SaveMode.Append if overwriteOperationType ==
WriteOperationType.BUCKET_RESCALE =>
- new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient,
instantTime)
+ new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient)
case SaveMode.Overwrite if overwriteOperationType ==
WriteOperationType.INSERT_OVERWRITE_TABLE =>
- new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig,
writeClient, instantTime)
+ new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig,
writeClient)
case _ =>
throw new HoodieException(s"$mode with bulk_insert in row writer path
is not supported yet");
}
val writeResult = executor.execute(df, tableConfig.isTablePartitioned)
+ val instantTime = executor.getInstantTime
try {
val (writeSuccessful, compactionInstant, clusteringInstant) = mode match
{
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index e729f938be1..37bd72b695c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -108,9 +108,7 @@ object AlterHoodieTableAddColumnsCommand extends
SparkAdapterSupport with Loggin
)
val commitActionType =
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA,
hoodieCatalogTable.tableType)
- val instantTime = client.createNewInstantTime()
-
- client.startCommitWithTime(instantTime, commitActionType)
+ val instantTime = client.startCommit(commitActionType)
client.preWrite(instantTime, WriteOperationType.ALTER_SCHEMA,
hoodieCatalogTable.metaClient)
val hoodieTable = HoodieSparkTable.create(client.getConfig,
client.getEngineContext)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index cf1083f272d..40132b5e54b 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -273,8 +273,7 @@ object AlterTableCommand extends Logging {
.build()
val commitActionType =
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA,
metaClient.getTableType)
- val instantTime = client.createNewInstantTime()
- client.startCommitWithTime(instantTime, commitActionType)
+ val instantTime = client.startCommit(commitActionType)
client.setOperationType(WriteOperationType.ALTER_SCHEMA)
val hoodieTable = HoodieSparkTable.create(client.getConfig,
client.getEngineContext)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
index 54527c7a799..7f5cb1c7a07 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
@@ -94,11 +94,10 @@ class TestMetadataTableSupport extends
HoodieSparkClientTestBase {
assertEquals(timestamp0, instants.get(4).requestedTime());
// Insert second batch.
- String timestamp1 = "20241015000000001";
+ String timestamp1 = writeClient.startCommit(REPLACE_COMMIT_ACTION);
List<HoodieRecord> records1 = dataGen.generateInserts(timestamp1, 50);
JavaRDD<HoodieRecord> dataset1 = jsc.parallelize(records1, 2);
- writeClient.startCommitWithTime(timestamp1, REPLACE_COMMIT_ACTION);
writeClient.insertOverwriteTable(dataset1, timestamp1);
// Validate.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index f2b76780f46..1882a78626a 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -324,15 +324,13 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
HoodieWriteConfig writeConfig2 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
writeConfig2.setSchema(writerSchema1);
final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
- final String nextCommitTime21 = "0021";
- startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21,
tableType);
+ final String nextCommitTime21 =
startSchemaEvolutionTransaction(metaClient, client2, tableType);
// Start concurrent txn 003 alter table schema
HoodieWriteConfig writeConfig3 =
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
writeConfig3.setSchema(writerSchema2);
final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
- final String nextCommitTime31 = "0031";
- startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31,
tableType);
+ final String nextCommitTime31 =
startSchemaEvolutionTransaction(metaClient, client3, tableType);
Properties props = new TypedProperties();
HoodieWriteConfig tableServiceWriteCfg = tableType.equals(MERGE_ON_READ)
@@ -1330,14 +1328,15 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
return result;
}
- private static void startSchemaEvolutionTransaction(HoodieTableMetaClient
metaClient, SparkRDDWriteClient client, String nextCommitTime2, HoodieTableType
tableType) throws IOException {
+ private static String startSchemaEvolutionTransaction(HoodieTableMetaClient
metaClient, SparkRDDWriteClient client, HoodieTableType tableType) throws
IOException {
String commitActionType =
CommitUtils.getCommitActionType(WriteOperationType.UPSERT, tableType);
- client.startCommitWithTime(nextCommitTime2, commitActionType);
- client.preWrite(nextCommitTime2, WriteOperationType.UPSERT,
client.createMetaClient(true));
- HoodieInstant requested =
metaClient.createNewInstant(HoodieInstant.State.REQUESTED, commitActionType,
nextCommitTime2);
+ String instant = client.startCommit(commitActionType);
+ client.preWrite(instant, WriteOperationType.UPSERT,
client.createMetaClient(true));
+ HoodieInstant requested =
metaClient.createNewInstant(HoodieInstant.State.REQUESTED, commitActionType,
instant);
HoodieCommitMetadata metadata = new HoodieCommitMetadata();
metadata.setOperationType(WriteOperationType.UPSERT);
client.createMetaClient(true).getActiveTimeline().transitionRequestedToInflight(requested,
Option.of(metadata));
+ return instant;
}
private void createCommitWithUpserts(HoodieWriteConfig cfg,
SparkRDDWriteClient client, String prevCommit,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
index 812b7518478..3bc5619ddfb 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
@@ -19,16 +19,17 @@
package org.apache.hudi.spark.internal;
import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestUtils;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -71,15 +72,18 @@ class TestHoodieDataSourceInternalBatchWrite extends
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testDataSourceWriter(boolean populateMetaFields) throws
Exception {
- testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP,
populateMetaFields);
+ testDataSourceWriterInternal(Collections.emptyMap(),
Collections.emptyMap(), populateMetaFields);
}
private void testDataSourceWriterInternal(Map<String, String> extraMetadata,
Map<String, String> expectedExtraMetadata, boolean populateMetaFields) throws
Exception {
// init config and table
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
- String instantTime = "001";
// init writer
+ String instantTime;
+ try (SparkRDDWriteClient<?> writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())),
cfg)) {
+ instantTime = writeClient.startCommit();
+ }
+
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, extraMetadata, populateMetaFields,
false);
DataWriter<InternalRow> writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0,
RANDOM.nextLong());
@@ -154,7 +158,7 @@ class TestHoodieDataSourceInternalBatchWrite extends
extraMeta.put("keyB", "valB");
extraMeta.put("commit_extra_c", "valC");
// none of the keys has commit metadata key prefix.
- testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP, true);
+ testDataSourceWriterInternal(extraMeta, Collections.emptyMap(), true);
}
@ParameterizedTest
@@ -162,15 +166,14 @@ class TestHoodieDataSourceInternalBatchWrite extends
public void testMultipleDataSourceWrites(boolean populateMetaFields) throws
Exception {
// init config and table
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
int partitionCounter = 0;
// execute N rounds
for (int i = 0; i < 2; i++) {
- String instantTime = "00" + i;
+ String instantTime = createInstant(cfg);
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
- new HoodieDataSourceInternalBatchWrite(instantTime, cfg,
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP,
populateMetaFields, false);
+ new HoodieDataSourceInternalBatchWrite(instantTime, cfg,
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(),
populateMetaFields, false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
RANDOM.nextLong());
@@ -209,15 +212,14 @@ class TestHoodieDataSourceInternalBatchWrite extends
public void testLargeWrites(boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
int partitionCounter = 0;
// execute N rounds
for (int i = 0; i < 3; i++) {
- String instantTime = "00" + i;
+ String instantTime = createInstant(cfg);
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
- new HoodieDataSourceInternalBatchWrite(instantTime, cfg,
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP,
populateMetaFields, false);
+ new HoodieDataSourceInternalBatchWrite(instantTime, cfg,
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(),
populateMetaFields, false);
List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
Dataset<Row> totalInputRows = null;
DataWriter<InternalRow> writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
RANDOM.nextLong());
@@ -261,11 +263,10 @@ class TestHoodieDataSourceInternalBatchWrite extends
public void testAbort(boolean populateMetaFields) throws Exception {
// init config and table
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
- HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
- String instantTime0 = "00" + 0;
+ String instantTime0 = createInstant(cfg);
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
- new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP,
populateMetaFields, false);
+ new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf, Collections.emptyMap(),
populateMetaFields, false);
DataWriter<InternalRow> writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0,
RANDOM.nextLong());
List<String> partitionPaths =
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
@@ -302,10 +303,10 @@ class TestHoodieDataSourceInternalBatchWrite extends
assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches,
size, Option.empty(), Option.empty());
// 2nd batch. abort in the end
- String instantTime1 = "00" + 1;
+ String instantTime1 = createInstant(cfg);
dataSourceInternalBatchWrite =
new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE,
sqlContext.sparkSession(), storageConf,
- Collections.EMPTY_MAP, populateMetaFields, false);
+ Collections.emptyMap(), populateMetaFields, false);
writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1,
RANDOM.nextLong());
for (int j = 0; j < batches; j++) {
@@ -334,4 +335,12 @@ class TestHoodieDataSourceInternalBatchWrite extends
writer.write(internalRow);
}
}
+
+ private String createInstant(HoodieWriteConfig cfg) {
+ String instantTime;
+ try (SparkRDDWriteClient<?> writeClient = new SparkRDDWriteClient<>(new
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())),
cfg)) {
+ instantTime = writeClient.startCommit();
+ }
+ return instantTime;
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 48d6743ad81..69872870b8c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -439,7 +439,7 @@ public class TestAsyncCompaction extends CompactionTestBase
{
Set<HoodieFileGroupId> fileGroupsBeforeReplace =
getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
// replace by using insertOverwrite
JavaRDD<HoodieRecord> replaceRecords =
jsc.parallelize(dataGen.generateInserts(replaceInstantTime, numRecs), 1);
- client.startCommitWithTime(replaceInstantTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
+
metaClient.getActiveTimeline().createRequestedCommitWithReplaceMetadata(replaceInstantTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
client.insertOverwrite(replaceRecords, replaceInstantTime);
metaClient.reloadActiveTimeline();
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 5411dc8c6c3..c3fc4ae85de 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -309,8 +309,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase
{
saveMode = SaveMode.Overwrite)
Using(getHoodieWriteClient(getWriteConfig(hudiOpts))) { client =>
- val commitTime = client.startCommit
- client.startCommitWithTime(commitTime,
HoodieTimeline.REPLACE_COMMIT_ACTION)
+ val commitTime = client.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION)
val deletingPartition = dataGen.getPartitionPaths.last
val partitionList = Collections.singletonList(deletingPartition)
client.deletePartitions(partitionList, commitTime)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
index 11c12de2d79..99e1a93bf45 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
@@ -248,8 +248,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
// Create an inflight commit
val client = HoodieCLIUtils.createHoodieWriteClient(spark, tablePath,
Map.empty, Option(tableName))
val metaClient = createMetaClient(spark, tablePath)
- val firstInstant = client.createNewInstantTime()
- client.startCommitWithTime(firstInstant, HoodieTimeline.COMMIT_ACTION)
+ val firstInstant = client.startCommit(HoodieTimeline.COMMIT_ACTION)
val hoodieTable = HoodieSparkTable.create(client.getConfig,
client.getEngineContext)
val timeLine = hoodieTable.getActiveTimeline
val requested =
hoodieTable.getInstantGenerator.createNewInstant(State.REQUESTED,
HoodieTimeline.COMMIT_ACTION, firstInstant)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
index 0d35013f960..609e5331f5e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
@@ -20,7 +20,7 @@
package org.apache.spark.sql.hudi.procedure
import org.apache.hudi.SparkDatasetMixin
-import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.{SparkRDDWriteClient, WriteClientTestUtils}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
import org.apache.hudi.common.table.HoodieTableConfig
@@ -87,7 +87,7 @@ class TestTTLProcedure extends HoodieSparkProcedureTestBase
with SparkDatasetMix
dataGen.generateInsertsForPartition(instantTime, 10, partition)
.asInstanceOf[java.util.List[HoodieRecord[Nothing]]]
// Use this JavaRDD to call the insert method
- client.startCommitWithTime(instantTime, HoodieTimeline.COMMIT_ACTION)
+ WriteClientTestUtils.startCommitWithTime(client, instantTime,
HoodieTimeline.COMMIT_ACTION)
client.insert(spark.sparkContext.parallelize(records.asScala.toSeq).toJavaRDD(),
instantTime)
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
index e81d15ff679..4a31f07c9a8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
@@ -19,14 +19,10 @@ package org.apache.hudi.utilities;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimeGenerator;
-import org.apache.hudi.common.table.timeline.TimeGenerators;
-import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -164,8 +160,6 @@ public class HoodieDropPartitionsTool implements
Serializable {
public String partitions = null;
@Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism
for hoodie insert/upsert/delete", required = false)
public int parallelism = 1500;
- @Parameter(names = {"--instant-time", "-it"}, description = "instant time
for delete table partitions operation.", required = false)
- public String instantTime = null;
@Parameter(names = {"--sync-hive-meta", "-sync"}, description = "Sync
information to HMS.", required = false)
public boolean syncToHive = false;
@Parameter(names = {"--hive-database", "-db"}, description = "Database to
sync to.", required = false)
@@ -213,7 +207,6 @@ public class HoodieDropPartitionsTool implements
Serializable {
+ " --table-name " + tableName + ", \n"
+ " --partitions " + partitions + ", \n"
+ " --parallelism " + parallelism + ", \n"
- + " --instantTime " + instantTime + ", \n"
+ " --sync-hive-meta " + syncToHive + ", \n"
+ " --hive-database " + hiveDataBase + ", \n"
+ " --hive-table-name " + hiveTableName + ", \n"
@@ -245,7 +238,6 @@ public class HoodieDropPartitionsTool implements
Serializable {
&& Objects.equals(runningMode, config.runningMode)
&& Objects.equals(tableName, config.tableName)
&& Objects.equals(partitions, config.partitions)
- && Objects.equals(instantTime, config.instantTime)
&& Objects.equals(syncToHive, config.syncToHive)
&& Objects.equals(hiveDataBase, config.hiveDataBase)
&& Objects.equals(hiveTableName, config.hiveTableName)
@@ -265,7 +257,7 @@ public class HoodieDropPartitionsTool implements
Serializable {
@Override
public int hashCode() {
- return Objects.hash(basePath, runningMode, tableName, partitions,
instantTime,
+ return Objects.hash(basePath, runningMode, tableName, partitions,
syncToHive, hiveDataBase, hiveTableName, hiveUserName, hivePassWord,
hiveURL,
hivePartitionsField, hiveUseJdbc, hiveHMSUris,
partitionValueExtractorClass,
sparkMaster, sparkMemory, propsFilePath, configs,
hiveSyncIgnoreException, help);
@@ -294,12 +286,6 @@ public class HoodieDropPartitionsTool implements
Serializable {
public void run() {
try {
- if (StringUtils.isNullOrEmpty(cfg.instantTime)) {
- TimeGenerator timeGenerator = TimeGenerators
-
.getTimeGenerator(HoodieTimeGeneratorConfig.defaultConfig(cfg.basePath),
- HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()));
- cfg.instantTime = TimelineUtils.generateInstantTime(true,
timeGenerator);
- }
LOG.info(cfg.toString());
Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase());
@@ -346,8 +332,8 @@ public class HoodieDropPartitionsTool implements
Serializable {
this.props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
try (SparkRDDWriteClient<HoodieRecordPayload> client =
UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism,
Option.empty(), props)) {
List<String> partitionsToDelete =
Arrays.asList(cfg.partitions.split(","));
- client.startCommitWithTime(cfg.instantTime,
HoodieTimeline.REPLACE_COMMIT_ACTION);
- client.deletePartitions(partitionsToDelete, cfg.instantTime);
+ String instantTime =
client.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION);
+ client.deletePartitions(partitionsToDelete, instantTime);
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index cbd3207a503..a0ddfb1dbc8 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -466,15 +466,14 @@ public class StreamSync implements Serializable,
Closeable {
try {
// Refresh Timeline
HoodieTableMetaClient metaClient =
initializeMetaClientAndRefreshTimeline();
- String instantTime = metaClient.createNewInstantTime();
- Pair<InputBatch, Boolean> inputBatchAndUseRowWriter =
readFromSource(instantTime, metaClient);
+ Pair<InputBatch, Boolean> inputBatchAndUseRowWriter =
readFromSource(metaClient);
if (inputBatchAndUseRowWriter != null) {
InputBatch inputBatch = inputBatchAndUseRowWriter.getLeft();
boolean useRowWriter = inputBatchAndUseRowWriter.getRight();
initializeWriteClientAndRetryTableServices(inputBatch, metaClient);
- result = writeToSinkAndDoMetaSync(instantTime, inputBatch,
useRowWriter, metrics, overallTimerContext);
+ result = writeToSinkAndDoMetaSync(metaClient, inputBatch,
useRowWriter, metrics, overallTimerContext);
}
// refresh schemas if need be before next batch
if (schemaProvider != null) {
@@ -553,7 +552,7 @@ public class StreamSync implements Serializable, Closeable {
* @return Pair<InputBatch and Boolean> Input data read from upstream
source, and boolean is true if the result should use the row writer path.
* @throws Exception in case of any Exception
*/
- public Pair<InputBatch, Boolean> readFromSource(String instantTime,
HoodieTableMetaClient metaClient) throws IOException {
+ public Pair<InputBatch, Boolean> readFromSource(HoodieTableMetaClient
metaClient) throws IOException {
// Retrieve the previous round checkpoints, if any
Option<Checkpoint> checkpointToResume =
StreamerCheckpointUtils.resolveCheckpointToResumeFrom(commitsTimelineOpt, cfg,
props, metaClient);
LOG.info("Checkpoint to resume from : {}", checkpointToResume);
@@ -563,7 +562,7 @@ public class StreamSync implements Serializable, Closeable {
Pair<InputBatch, Boolean> sourceDataToSync = null;
while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
try {
- sourceDataToSync =
fetchFromSourceAndPrepareRecords(checkpointToResume, instantTime, metaClient);
+ sourceDataToSync =
fetchFromSourceAndPrepareRecords(checkpointToResume, metaClient);
} catch (HoodieSourceTimeoutException e) {
if (curRetryCount >= maxRetryCount) {
throw e;
@@ -580,8 +579,7 @@ public class StreamSync implements Serializable, Closeable {
return sourceDataToSync;
}
- private Pair<InputBatch, Boolean>
fetchFromSourceAndPrepareRecords(Option<Checkpoint> resumeCheckpoint, String
instantTime,
- HoodieTableMetaClient metaClient) {
+ private Pair<InputBatch, Boolean>
fetchFromSourceAndPrepareRecords(Option<Checkpoint> resumeCheckpoint,
HoodieTableMetaClient metaClient) {
hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Fetching
next batch: " + cfg.targetTableName);
HoodieRecordType recordType = createRecordMerger(props).getRecordType();
if (recordType == HoodieRecordType.SPARK &&
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
@@ -595,7 +593,6 @@ public class StreamSync implements Serializable, Closeable {
InputBatch inputBatch = inputBatchAndRowWriterEnabled.getLeft();
boolean useRowWriter = inputBatchAndRowWriterEnabled.getRight();
final Checkpoint checkpoint = inputBatch.getCheckpointForNextBatch();
- final SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
// handle no new data and no change in checkpoint
if (!cfg.allowCommitOnNoCheckpointChange &&
checkpoint.equals(resumeCheckpoint.orElse(null))) {
@@ -608,14 +605,7 @@ public class StreamSync implements Serializable, Closeable
{
// handle empty batch with change in checkpoint
hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking
if input is empty: " + cfg.targetTableName);
-
- if (useRowWriter) { // no additional processing required for row writer.
- return Pair.of(inputBatch, true);
- } else {
- Option<JavaRDD<HoodieRecord>> recordsOpt =
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(),
schemaProvider,
- recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
- return Pair.of(new InputBatch(recordsOpt, checkpoint, schemaProvider),
false);
- }
+ return Pair.of(inputBatch, useRowWriter);
}
@VisibleForTesting
@@ -792,18 +782,19 @@ public class StreamSync implements Serializable,
Closeable {
/**
* Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive
if needed.
*
- * @param instantTime instant time to use for ingest.
+ * @param metaClient meta client for the table
* @param inputBatch input batch that contains the records,
checkpoint, and schema provider
* @param useRowWriter whether to use row writer
* @param metrics Metrics
* @param overallTimerContext Timer Context
* @return Option Compaction instant if one is scheduled
*/
- private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch,
+ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(HoodieTableMetaClient metaClient, InputBatch
inputBatch,
boolean useRowWriter,
HoodieIngestionMetrics metrics,
Timer.Context overallTimerContext) {
boolean releaseResourcesInvoked = false;
+ String instantTime = startCommit(metaClient, !autoGenerateRecordKeys);
try {
Option<String> scheduledCompactionInstant = Option.empty();
// write to hudi and fetch result
@@ -928,15 +919,14 @@ public class StreamSync implements Serializable,
Closeable {
*
* @return Instant time of the commit
*/
- private String startCommit(String instantTime, boolean retryEnabled) {
+ private String startCommit(HoodieTableMetaClient metaClient, boolean
retryEnabled) {
final int maxRetries = 2;
int retryNum = 1;
RuntimeException lastException = null;
while (retryNum <= maxRetries) {
try {
String commitActionType =
CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- writeClient.startCommitWithTime(instantTime, commitActionType);
- return instantTime;
+ return writeClient.startCommit(commitActionType, metaClient);
} catch (IllegalArgumentException ie) {
lastException = ie;
if (!retryEnabled) {
@@ -950,22 +940,23 @@ public class StreamSync implements Serializable,
Closeable {
// No-Op
}
}
- instantTime = writeClient.createNewInstantTime();
}
throw lastException;
}
private WriteClientWriteResult writeToSink(InputBatch inputBatch, String
instantTime, boolean useRowWriter) {
WriteClientWriteResult writeClientWriteResult = null;
- instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
if (useRowWriter) {
Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().orElseGet(() ->
hoodieSparkContext.getSqlContext().emptyDataFrame());
HoodieWriteConfig hoodieWriteConfig =
prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
- BaseDatasetBulkInsertCommitActionExecutor executor = new
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig,
writeClient, instantTime);
+ BaseDatasetBulkInsertCommitActionExecutor executor = new
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig,
writeClient);
writeClientWriteResult = new WriteClientWriteResult(executor.execute(df,
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
} else {
- JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>)
inputBatch.getBatch().orElseGet(() -> hoodieSparkContext.emptyRDD());
+ HoodieRecordType recordType = createRecordMerger(props).getRecordType();
+ Option<JavaRDD<HoodieRecord>> recordsOption =
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(),
inputBatch.getSchemaProvider(),
+ recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
+ JavaRDD<HoodieRecord> records = recordsOption.orElseGet(() ->
hoodieSparkContext.emptyRDD());
// filter dupes if needed
if (cfg.filterDupes) {
records = DataSourceUtils.handleDuplicates(hoodieSparkContext,
records, writeClient.getConfig(), false);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 9d47ee43802..26caabc93da 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -3295,7 +3295,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieStreamer deltaStreamer = new HoodieStreamer(cfg, jsc);
HoodieStreamer.StreamSyncService streamSyncService =
(HoodieStreamer.StreamSyncService) deltaStreamer.getIngestionService();
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(HoodieTestUtils.getDefaultStorageConf()).setBasePath(tableBasePath).build();
- InputBatch inputBatch =
streamSyncService.getStreamSync().readFromSource("00000", metaClient).getLeft();
+ InputBatch inputBatch =
streamSyncService.getStreamSync().readFromSource(metaClient).getLeft();
// Read from source and validate persistRdd call.
JavaRDD<GenericRecord> sourceRdd = (JavaRDD<GenericRecord>)
inputBatch.getBatch().get();
assertEquals(1000, sourceRdd.count());