yihua commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2103489748


##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java:
##########
@@ -672,49 +689,61 @@ public List<WriteStatus> writeBatch(HoodieJavaWriteClient 
client, String newComm
    * @param expRecordsInThisCommit       Expected number of records in this 
commit
    * @param expTotalRecords              Expected number of records when 
scanned
    * @param expTotalCommits              Expected number of commits (including 
this commit)
-   * @param doCommit
    * @throws Exception in case of error
    */
   public List<WriteStatus> writeBatch(HoodieJavaWriteClient client, String 
newCommitTime, String prevCommitTime,
                                       Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
                                       Function2<List<HoodieRecord>, String, 
Integer> recordGenFunction,
                                       Function3<List<WriteStatus>, 
HoodieJavaWriteClient, List<HoodieRecord>, String> writeFn,
-                                      boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean 
doCommit,
-                                      boolean filterForCommitTimeWithAssert, 
InstantGenerator instantGenerator) throws Exception {
+                                      boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
+                                      boolean filterForCommitTimeWithAssert, 
InstantGenerator instantGenerator,
+                                      boolean skipCommit) throws Exception {
 
     List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, 
numRecordsInThisCommit);
     return writeBatchHelper(client, newCommitTime, prevCommitTime, 
commitTimesBetweenPrevAndNew, initCommitTime,
         numRecordsInThisCommit, records, writeFn, assertForCommit, 
expRecordsInThisCommit, expTotalRecords,
-        expTotalCommits, doCommit, filterForCommitTimeWithAssert, 
instantGenerator);
+        expTotalCommits, filterForCommitTimeWithAssert, instantGenerator, 
skipCommit);
   }
 
   public List<WriteStatus> writeBatch(HoodieJavaWriteClient client, String 
newCommitTime, String prevCommitTime,
                                       Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
                                       Function3<List<HoodieRecord>, String, 
Integer, String> recordGenFunction,
                                       Function3<List<WriteStatus>, 
HoodieJavaWriteClient, List<HoodieRecord>, String> writeFn,
-                                      boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean 
doCommit,
+                                      boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
                                       boolean filterForCommitTimeWithAssert,
                                       String partition, InstantGenerator 
instantGenerator) throws Exception {
 
     List<HoodieRecord> records = recordGenFunction.apply(newCommitTime, 
numRecordsInThisCommit, partition);
     return writeBatchHelper(client, newCommitTime, prevCommitTime, 
commitTimesBetweenPrevAndNew, initCommitTime,
         numRecordsInThisCommit, records, writeFn, assertForCommit, 
expRecordsInThisCommit, expTotalRecords,
-        expTotalCommits, doCommit, filterForCommitTimeWithAssert, 
instantGenerator);
+        expTotalCommits, filterForCommitTimeWithAssert, instantGenerator);
+  }
+
+  private List<WriteStatus> writeBatchHelper(HoodieJavaWriteClient client, 
String newCommitTime, String prevCommitTime,
+                                             Option<List<String>> 
commitTimesBetweenPrevAndNew, String initCommitTime,
+                                             int numRecordsInThisCommit, 
List<HoodieRecord> records,
+                                             Function3<List<WriteStatus>, 
HoodieJavaWriteClient, List<HoodieRecord>, String> writeFn,
+                                             boolean assertForCommit, int 
expRecordsInThisCommit, int expTotalRecords,
+                                             int expTotalCommits, boolean 
filterForCommitTimeWithAssert, InstantGenerator instantGenerator) throws 
IOException {
+    return writeBatchHelper(client, newCommitTime, prevCommitTime, 
commitTimesBetweenPrevAndNew, initCommitTime,
+        numRecordsInThisCommit, records, writeFn, assertForCommit, 
expRecordsInThisCommit, expTotalRecords, expTotalCommits,
+        filterForCommitTimeWithAssert, instantGenerator, false);

Review Comment:
   nit: we have too many such private utils that confuse developers.  We should 
clean them up and only keep one or two in a follow-up.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java:
##########
@@ -273,34 +274,38 @@ public void testTurnOffMetadataIndexAfterEnable() throws 
Exception {
     HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA, 
HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
         .withParallelism(1, 
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
         
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).build())

Review Comment:
   Let's check why this is disabled



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java:
##########
@@ -331,7 +331,6 @@ protected HoodieWriteConfig.Builder 
getWriteConfigBuilder(HoodieFailedWritesClea
     Properties properties = new Properties();
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 
2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
-        .withAutoCommit(autoCommit)

Review Comment:
   nit: remove method argument



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java:
##########
@@ -235,7 +236,6 @@ private void testBootstrapCommon(boolean partitioned, 
boolean deltaCommit, Effec
     long timestamp = Instant.now().toEpochMilli();
     Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, 
partitions, bootstrapBasePath);
     HoodieWriteConfig config = getConfigBuilder(schema.toString(), partitioned)
-        .withAutoCommit(true)

Review Comment:
   Have you revisited the test to see if there is any adjustment that needs to 
be made by removing this (meaning auto commit `true` -> `false`)?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java:
##########
@@ -215,17 +232,17 @@ public void 
testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexTyp
       String commitTimeAtEpoch15 = TimelineUtils.generateInstantTime(false, 
timeGenerator);
       List<HoodieRecord> updatesAtEpoch15 = getUpdates(updatesAtEpoch5, p3, 
15, payloadClass);
       WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch15);
-      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch15, 
2), commitTimeAtEpoch15).collect());
-      // for the same bug pointed out earlier, (ignoring rollbacks while 
determining last instant while reading log records), this tests the 
HoodieFileGroupReader.
+      client.commit(commitTimeAtEpoch15, 
client.upsert(jsc().parallelize(updatesAtEpoch15, 2), commitTimeAtEpoch15));
+      // for the same bug pointed out earlier, (ignoring rollbacks while 
determining last instant while reading log records), this tests the 
HoodieMergedReadHandle.

Review Comment:
   Nit: fix comments
   ```suggestion
         // for the same bug pointed out earlier, (ignoring rollbacks while 
determining last instant while reading log records), this tests the 
HoodieMergedReadHandle.
   ```



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java:
##########
@@ -57,7 +59,6 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean 
autoCommit) {
     return HoodieWriteConfig.newBuilder().withPath(basePath)
         .withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2)
-        .withAutoCommit(autoCommit)

Review Comment:
   nit: remove method argument not used



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -97,9 +96,15 @@ public BaseCommitActionExecutor(HoodieEngineContext context, 
HoodieWriteConfig c
     this.operationType = operationType;
     this.extraMetadata = extraMetadata;
     this.taskContextSupplier = context.getTaskContextSupplier();
-    // TODO : Remove this once we refactor and move out autoCommit method from 
here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
-    this.txnManagerOption = config.shouldAutoCommit()
-        ? Option.of(new TransactionManager(config, table.getStorage())) : 
Option.empty();
+    this.txnManagerOption = Option.empty();
+    initializeTransactionSupportingCast();
+    if (!table.getStorageLayout().writeOperationSupported(operationType)) {
+      throw new UnsupportedOperationException("Executor " + 
this.getClass().getSimpleName()
+          + " is not compatible with table layout " + 
table.getStorageLayout().getClass().getSimpleName());
+    }
+  }
+
+  private void initializeTransactionSupportingCast() {

Review Comment:
   nit: what does `SupportingCast` mean?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java:
##########
@@ -123,7 +125,7 @@ private void testInsertAndCleanByVersions(
       Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
       throws Exception {
     int maxVersions = 2; // keep upto 2 versions for each file
-    HoodieWriteConfig cfg = getConfigBuilder(true)
+    HoodieWriteConfig cfg = getConfigBuilder(false)

Review Comment:
   Let's track a JIRA ticket as a follow-up to simplify these util methods that 
are easy to do.  The argument is no longer used and confuses me a lot.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -1860,7 +1844,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       }
       if (i == 1) {
         val writeConfig = HoodieWriteConfig.newBuilder()
-          .forTable("hoodie_test")
+            .forTable("hoodie_test")

Review Comment:
   nit: indentation



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java:
##########
@@ -153,18 +153,27 @@ public HoodieInstant 
createRequestedCommitWithReplaceMetadata(String instantTime
     return instant;
   }
 
+  public String createCompletionTime() {

Review Comment:
   Let's revisit these new APIs as a follow-up to see if we can simplify them.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java:
##########
@@ -304,28 +321,28 @@ public void testUdpateSubsetOfRecUpdates(HoodieTableType 
tableType, IndexType in
       // 1st batch: insert 1,2
       String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
       WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch0);
-      
assertNoWriteErrors(client.upsert(jsc().parallelize(allInserts.subList(0,2), 
2), commitTimeAtEpoch0).collect());
+      client.commit(commitTimeAtEpoch0, 
client.upsert(jsc().parallelize(allInserts.subList(0,2), 2), 
commitTimeAtEpoch0));
       readTableAndValidate(metaClient, new int[] {0, 1}, p1, 0L);
 
       // 2nd batch: update records 1,2 and insert 3
       String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
       List<HoodieRecord> updatesAtEpoch5 = getUpdates(allInserts.subList(0,3), 
5, payloadClass);
       WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch5);
-      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2), 
commitTimeAtEpoch5).collect());
+      client.commit(commitTimeAtEpoch5, 
client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5));
       readTableAndValidate(metaClient, new int[] {0, 1, 2}, p1, 
getExpectedTsMap(new int[] {0, 1, 2}, new Long[] {5L, 5L, 5L}));
 
       // 3rd batch: update records 1,2,3 and insert 4
       String commitTimeAtEpoch10 = getCommitTimeAtUTC(10);
       List<HoodieRecord> updatesAtEpoch10 = getUpdates(allInserts, 10, 
payloadClass);
       WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch10);
-      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch10, 
2), commitTimeAtEpoch10).collect());
+      client.commit(commitTimeAtEpoch10, 
client.upsert(jsc().parallelize(updatesAtEpoch10, 2), commitTimeAtEpoch10));
       readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1, 
getExpectedTsMap(new int[] {0, 1, 2, 3}, new Long[] {10L, 10L, 10L, 10L}));
 
       // 4th batch: update all from p1 to p2
       String commitTimeAtEpoch20 = getCommitTimeAtUTC(20);
       List<HoodieRecord> updatesAtEpoch20 = getUpdates(allInserts, p2, 20, 
payloadClass);
       WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch20);
-      assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20, 
2), commitTimeAtEpoch20).collect());
+      client.commit(commitTimeAtEpoch20, 
client.upsert(jsc().parallelize(updatesAtEpoch20, 2), commitTimeAtEpoch20));

Review Comment:
   Nit: do we want to keep `assertNoWriteErrors`?



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java:
##########
@@ -203,22 +208,23 @@ public void testRecordGenerationAPIsForMOR() throws 
IOException {
 
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
     HoodieWriteConfig writeConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
-        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2)
-            .withInlineCompaction(true)
+        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(3)

Review Comment:
   Follow up to check this test behavior change



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieFileSystemViews.java:
##########
@@ -95,7 +102,14 @@ public static List<Arguments> 
tableTypeMetadataFSVTypeArgs() {
   @ParameterizedTest
   @MethodSource("tableTypeMetadataFSVTypeArgs")
   public void testFileSystemViewConsistency(HoodieTableType tableType, boolean 
enableMdt, FileSystemViewStorageType storageType, int writeVersion) throws 
IOException {
+    metaClient.getStorage().deleteDirectory(new StoragePath(basePath));
     this.tableType = tableType;
+    Properties properties = new Properties();

Review Comment:
   Next time we should separate these independent changes to a separate PR so 
it's easier for reviewers.



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala:
##########
@@ -99,7 +99,7 @@ class TestCOWDataSourceStorage extends 
SparkClientFunctionalTestHarness {
     val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
     inputDF0.write.format("org.apache.hudi")
       .options(options)
-      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+      .option(DataSourceWriteOptions.OPERATION.key, 
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)

Review Comment:
   nit: check this again



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java:
##########
@@ -123,7 +125,7 @@ private void testInsertAndCleanByVersions(
       Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient, 
JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
       throws Exception {
     int maxVersions = 2; // keep upto 2 versions for each file
-    HoodieWriteConfig cfg = getConfigBuilder(true)
+    HoodieWriteConfig cfg = getConfigBuilder(false)

Review Comment:
   In general, this should be done as part of the refactoring step, instead of 
a clean-up later on.



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java:
##########
@@ -59,7 +59,6 @@ public void testHoodieCompactorWithOptionalClean(boolean 
skipClean) throws Excep
         .withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2)
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
-        .withAutoCommit(false)

Review Comment:
   nit: Check if auto commit disabled (compared to previous enabled as default) 
has any implication in tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to