yihua commented on code in PR #13229:
URL: https://github.com/apache/hudi/pull/13229#discussion_r2103489748
##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java:
##########
@@ -672,49 +689,61 @@ public List<WriteStatus> writeBatch(HoodieJavaWriteClient
client, String newComm
* @param expRecordsInThisCommit Expected number of records in this
commit
* @param expTotalRecords Expected number of records when
scanned
* @param expTotalCommits Expected number of commits (including
this commit)
- * @param doCommit
* @throws Exception in case of error
*/
public List<WriteStatus> writeBatch(HoodieJavaWriteClient client, String
newCommitTime, String prevCommitTime,
Option<List<String>>
commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
Function2<List<HoodieRecord>, String,
Integer> recordGenFunction,
Function3<List<WriteStatus>,
HoodieJavaWriteClient, List<HoodieRecord>, String> writeFn,
- boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean
doCommit,
- boolean filterForCommitTimeWithAssert,
InstantGenerator instantGenerator) throws Exception {
+ boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
+ boolean filterForCommitTimeWithAssert,
InstantGenerator instantGenerator,
+ boolean skipCommit) throws Exception {
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime,
numRecordsInThisCommit);
return writeBatchHelper(client, newCommitTime, prevCommitTime,
commitTimesBetweenPrevAndNew, initCommitTime,
numRecordsInThisCommit, records, writeFn, assertForCommit,
expRecordsInThisCommit, expTotalRecords,
- expTotalCommits, doCommit, filterForCommitTimeWithAssert,
instantGenerator);
+ expTotalCommits, filterForCommitTimeWithAssert, instantGenerator,
skipCommit);
}
public List<WriteStatus> writeBatch(HoodieJavaWriteClient client, String
newCommitTime, String prevCommitTime,
Option<List<String>>
commitTimesBetweenPrevAndNew, String initCommitTime, int numRecordsInThisCommit,
Function3<List<HoodieRecord>, String,
Integer, String> recordGenFunction,
Function3<List<WriteStatus>,
HoodieJavaWriteClient, List<HoodieRecord>, String> writeFn,
- boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits, boolean
doCommit,
+ boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords, int expTotalCommits,
boolean filterForCommitTimeWithAssert,
String partition, InstantGenerator
instantGenerator) throws Exception {
List<HoodieRecord> records = recordGenFunction.apply(newCommitTime,
numRecordsInThisCommit, partition);
return writeBatchHelper(client, newCommitTime, prevCommitTime,
commitTimesBetweenPrevAndNew, initCommitTime,
numRecordsInThisCommit, records, writeFn, assertForCommit,
expRecordsInThisCommit, expTotalRecords,
- expTotalCommits, doCommit, filterForCommitTimeWithAssert,
instantGenerator);
+ expTotalCommits, filterForCommitTimeWithAssert, instantGenerator);
+ }
+
+ private List<WriteStatus> writeBatchHelper(HoodieJavaWriteClient client,
String newCommitTime, String prevCommitTime,
+ Option<List<String>>
commitTimesBetweenPrevAndNew, String initCommitTime,
+ int numRecordsInThisCommit,
List<HoodieRecord> records,
+ Function3<List<WriteStatus>,
HoodieJavaWriteClient, List<HoodieRecord>, String> writeFn,
+ boolean assertForCommit, int
expRecordsInThisCommit, int expTotalRecords,
+ int expTotalCommits, boolean
filterForCommitTimeWithAssert, InstantGenerator instantGenerator) throws
IOException {
+ return writeBatchHelper(client, newCommitTime, prevCommitTime,
commitTimesBetweenPrevAndNew, initCommitTime,
+ numRecordsInThisCommit, records, writeFn, assertForCommit,
expRecordsInThisCommit, expTotalRecords, expTotalCommits,
+ filterForCommitTimeWithAssert, instantGenerator, false);
Review Comment:
nit: we have too many such private utils that confuse developers. We should
clean them up and only keep one or two in a follow-up.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java:
##########
@@ -273,34 +274,38 @@ public void testTurnOffMetadataIndexAfterEnable() throws
Exception {
HoodieWriteConfig cfg = getConfigBuilder(TRIP_EXAMPLE_SCHEMA,
HoodieIndex.IndexType.BLOOM, HoodieFailedWritesCleaningPolicy.EAGER)
.withParallelism(1,
1).withBulkInsertParallelism(1).withFinalizeWriteParallelism(1).withDeleteParallelism(1)
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).build())
Review Comment:
Let's check why this is disabled
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java:
##########
@@ -331,7 +331,6 @@ protected HoodieWriteConfig.Builder
getWriteConfigBuilder(HoodieFailedWritesClea
Properties properties = new Properties();
return
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2,
2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
- .withAutoCommit(autoCommit)
Review Comment:
nit: remove method argument
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java:
##########
@@ -235,7 +236,6 @@ private void testBootstrapCommon(boolean partitioned,
boolean deltaCommit, Effec
long timestamp = Instant.now().toEpochMilli();
Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords,
partitions, bootstrapBasePath);
HoodieWriteConfig config = getConfigBuilder(schema.toString(), partitioned)
- .withAutoCommit(true)
Review Comment:
Have you revisited the test to see if there is any adjustment that needs to
be made by removing this (meaning auto commit `true` -> `false`)?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java:
##########
@@ -215,17 +232,17 @@ public void
testRollbacksWithPartitionUpdate(HoodieTableType tableType, IndexTyp
String commitTimeAtEpoch15 = TimelineUtils.generateInstantTime(false,
timeGenerator);
List<HoodieRecord> updatesAtEpoch15 = getUpdates(updatesAtEpoch5, p3,
15, payloadClass);
WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch15);
- assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch15,
2), commitTimeAtEpoch15).collect());
- // for the same bug pointed out earlier, (ignoring rollbacks while
determining last instant while reading log records), this tests the
HoodieFileGroupReader.
+ client.commit(commitTimeAtEpoch15,
client.upsert(jsc().parallelize(updatesAtEpoch15, 2), commitTimeAtEpoch15));
+ // for the same bug pointed out earlier, (ignoring rollbacks while
determining last instant while reading log records), this tests the
HoodieMergedReadHandle.
Review Comment:
Nit: fix comments
```suggestion
// for the same bug pointed out earlier, (ignoring rollbacks while
determining last instant while reading log records), this tests the
HoodieMergedReadHandle.
```
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java:
##########
@@ -57,7 +59,6 @@ protected HoodieWriteConfig.Builder getConfigBuilder(Boolean
autoCommit) {
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
- .withAutoCommit(autoCommit)
Review Comment:
nit: remove method argument not used
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java:
##########
@@ -97,9 +96,15 @@ public BaseCommitActionExecutor(HoodieEngineContext context,
HoodieWriteConfig c
this.operationType = operationType;
this.extraMetadata = extraMetadata;
this.taskContextSupplier = context.getTaskContextSupplier();
- // TODO : Remove this once we refactor and move out autoCommit method from
here, since the TxnManager is held in {@link BaseHoodieWriteClient}.
- this.txnManagerOption = config.shouldAutoCommit()
- ? Option.of(new TransactionManager(config, table.getStorage())) :
Option.empty();
+ this.txnManagerOption = Option.empty();
+ initializeTransactionSupportingCast();
+ if (!table.getStorageLayout().writeOperationSupported(operationType)) {
+ throw new UnsupportedOperationException("Executor " +
this.getClass().getSimpleName()
+ + " is not compatible with table layout " +
table.getStorageLayout().getClass().getSimpleName());
+ }
+ }
+
+ private void initializeTransactionSupportingCast() {
Review Comment:
nit: what does `SupportingCast` mean?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java:
##########
@@ -123,7 +125,7 @@ private void testInsertAndCleanByVersions(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
throws Exception {
int maxVersions = 2; // keep upto 2 versions for each file
- HoodieWriteConfig cfg = getConfigBuilder(true)
+ HoodieWriteConfig cfg = getConfigBuilder(false)
Review Comment:
Let's track a JIRA ticket as a follow-up to simplify these util methods that
are easy to do. The argument is no longer used and confuses me a lot.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala:
##########
@@ -1860,7 +1844,7 @@ class TestCOWDataSource extends HoodieSparkClientTestBase
with ScalaAssertionSup
}
if (i == 1) {
val writeConfig = HoodieWriteConfig.newBuilder()
- .forTable("hoodie_test")
+ .forTable("hoodie_test")
Review Comment:
nit: indentation
##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/v2/ActiveTimelineV2.java:
##########
@@ -153,18 +153,27 @@ public HoodieInstant
createRequestedCommitWithReplaceMetadata(String instantTime
return instant;
}
+ public String createCompletionTime() {
Review Comment:
Let's revisit these new APIs as a follow-up to see if we can simplify them.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestGlobalIndexEnableUpdatePartitions.java:
##########
@@ -304,28 +321,28 @@ public void testUdpateSubsetOfRecUpdates(HoodieTableType
tableType, IndexType in
// 1st batch: insert 1,2
String commitTimeAtEpoch0 = getCommitTimeAtUTC(0);
WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch0);
-
assertNoWriteErrors(client.upsert(jsc().parallelize(allInserts.subList(0,2),
2), commitTimeAtEpoch0).collect());
+ client.commit(commitTimeAtEpoch0,
client.upsert(jsc().parallelize(allInserts.subList(0,2), 2),
commitTimeAtEpoch0));
readTableAndValidate(metaClient, new int[] {0, 1}, p1, 0L);
// 2nd batch: update records 1,2 and insert 3
String commitTimeAtEpoch5 = getCommitTimeAtUTC(5);
List<HoodieRecord> updatesAtEpoch5 = getUpdates(allInserts.subList(0,3),
5, payloadClass);
WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch5);
- assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch5, 2),
commitTimeAtEpoch5).collect());
+ client.commit(commitTimeAtEpoch5,
client.upsert(jsc().parallelize(updatesAtEpoch5, 2), commitTimeAtEpoch5));
readTableAndValidate(metaClient, new int[] {0, 1, 2}, p1,
getExpectedTsMap(new int[] {0, 1, 2}, new Long[] {5L, 5L, 5L}));
// 3rd batch: update records 1,2,3 and insert 4
String commitTimeAtEpoch10 = getCommitTimeAtUTC(10);
List<HoodieRecord> updatesAtEpoch10 = getUpdates(allInserts, 10,
payloadClass);
WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch10);
- assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch10,
2), commitTimeAtEpoch10).collect());
+ client.commit(commitTimeAtEpoch10,
client.upsert(jsc().parallelize(updatesAtEpoch10, 2), commitTimeAtEpoch10));
readTableAndValidate(metaClient, new int[] {0, 1, 2, 3}, p1,
getExpectedTsMap(new int[] {0, 1, 2, 3}, new Long[] {10L, 10L, 10L, 10L}));
// 4th batch: update all from p1 to p2
String commitTimeAtEpoch20 = getCommitTimeAtUTC(20);
List<HoodieRecord> updatesAtEpoch20 = getUpdates(allInserts, p2, 20,
payloadClass);
WriteClientTestUtils.startCommitWithTime(client, commitTimeAtEpoch20);
- assertNoWriteErrors(client.upsert(jsc().parallelize(updatesAtEpoch20,
2), commitTimeAtEpoch20).collect());
+ client.commit(commitTimeAtEpoch20,
client.upsert(jsc().parallelize(updatesAtEpoch20, 2), commitTimeAtEpoch20));
Review Comment:
Nit: do we want to keep `assertNoWriteErrors`?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java:
##########
@@ -203,22 +208,23 @@ public void testRecordGenerationAPIsForMOR() throws
IOException {
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
HoodieWriteConfig writeConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER)
-
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(2)
- .withInlineCompaction(true)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(3)
Review Comment:
Follow up to check this test behavior change
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestHoodieFileSystemViews.java:
##########
@@ -95,7 +102,14 @@ public static List<Arguments>
tableTypeMetadataFSVTypeArgs() {
@ParameterizedTest
@MethodSource("tableTypeMetadataFSVTypeArgs")
public void testFileSystemViewConsistency(HoodieTableType tableType, boolean
enableMdt, FileSystemViewStorageType storageType, int writeVersion) throws
IOException {
+ metaClient.getStorage().deleteDirectory(new StoragePath(basePath));
this.tableType = tableType;
+ Properties properties = new Properties();
Review Comment:
Next time we should separate these independent changes to a separate PR so
it's easier for reviewers.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala:
##########
@@ -99,7 +99,7 @@ class TestCOWDataSourceStorage extends
SparkClientFunctionalTestHarness {
val inputDF0 = spark.read.json(spark.sparkContext.parallelize(records0, 2))
inputDF0.write.format("org.apache.hudi")
.options(options)
- .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
+ .option(DataSourceWriteOptions.OPERATION.key,
DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
Review Comment:
nit: check this again
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/clean/TestCleanerInsertAndCleanByVersions.java:
##########
@@ -123,7 +125,7 @@ private void testInsertAndCleanByVersions(
Function3<JavaRDD<WriteStatus>, SparkRDDWriteClient,
JavaRDD<HoodieRecord>, String> upsertFn, boolean isPreppedAPI)
throws Exception {
int maxVersions = 2; // keep upto 2 versions for each file
- HoodieWriteConfig cfg = getConfigBuilder(true)
+ HoodieWriteConfig cfg = getConfigBuilder(false)
Review Comment:
In general, this should be done as part of the refactoring step, instead of
a clean-up later on.
##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/offlinejob/TestHoodieCompactorJob.java:
##########
@@ -59,7 +59,6 @@ public void testHoodieCompactorWithOptionalClean(boolean
skipClean) throws Excep
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
- .withAutoCommit(false)
Review Comment:
nit: Check if auto commit disabled (compared to previous enabled as default)
has any implication in tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]