codope commented on code in PR #12746:
URL: https://github.com/apache/hudi/pull/12746#discussion_r1936709737
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java:
##########
@@ -256,6 +256,7 @@ public Writer build() throws IOException {
if (versionAndWriteToken.isPresent()) {
logVersion = versionAndWriteToken.get().getKey();
logWriteToken = versionAndWriteToken.get().getValue();
+ //logVersion++; // bump up the version so the new file has the
next log version.
Review Comment:
so this is not needed right. Please remove it
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala:
##########
@@ -30,10 +30,11 @@ class TestMergeModeCommitTimeOrdering extends
HoodieSparkSqlTestBase {
// TODO(HUDI-8938): add "mor,true,true,6" after the fix
Seq(
- "cow,8,false,false", "cow,8,false,true", "cow,8,true,false",
+ /*"cow,8,false,false", "cow,8,false,true", "cow,8,true,false",
Review Comment:
Do we need to comment this test?
##########
hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java:
##########
@@ -102,8 +102,11 @@ private FSDataOutputStream getOutputStream() throws
IOException {
boolean created = false;
while (!created) {
try {
+ if (storage.exists(logFile.getPath())) {
Review Comment:
Discussed offline, this isn't extra overhead over what we already had in
0.x. But, we need to understand two things:
1. Why was it removed?
2. Is it really needed? It looks like an idempotency guard against
fsview/task retries. But, let's revisit later if we can optimize the log format
writer flow.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala:
##########
@@ -225,7 +226,7 @@ class TestMergeModeCommitTimeOrdering extends
HoodieSparkSqlTestBase {
}
// TODO(HUDI-8468): add COW test after supporting COMMIT_TIME_ORDERING in
MERGE INTO for COW
- test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType
table "
+ /*test(s"Test merge operations with COMMIT_TIME_ORDERING for $tableType
table "
Review Comment:
same here - Do we need to comment this test?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java:
##########
@@ -222,6 +225,96 @@ public void testUpsertPartitioner(boolean
populateMetaFields) throws Exception {
}
}
+ @Test
+ public void testUpsertPartitionerWithTableVersion6() throws Exception {
+ HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
+ addConfigsForPopulateMetaFields(cfgBuilder, true);
+ cfgBuilder.withWriteTableVersion(6);
+ HoodieWriteConfig cfg = cfgBuilder.build();
+
+ // create meta client w/ the table version 6
+ Properties props = getPropertiesForKeyGen(true);
+ props.put(WRITE_TABLE_VERSION.key(), "6");
+ metaClient = getHoodieMetaClient(storageConf(), basePath(), props,
HoodieTableType.MERGE_ON_READ);
+ dataGen = new HoodieTestDataGenerator();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+ // batch 1 insert
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
+ List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(),
metaClient);
+
+ Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
+ assertTrue(deltaCommit.isPresent());
+ assertEquals("001", deltaCommit.get().requestedTime(), "Delta commit
should be 001");
+
+ Option<HoodieInstant> commit =
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
+ assertFalse(commit.isPresent());
+
+ List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable);
+ BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
+ metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
+
+ Map<String, String> baseFileMapping = new HashMap<>();
+ Map<String, List<String>> baseFileToLogFileMapping = new HashMap<>();
+ BaseFileOnlyView finalRoView = roView;
+
Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).forEach(partitionPath
-> {
+ String baseFileName =
finalRoView.getLatestBaseFiles(partitionPath).collect(Collectors.toList()).get(0).getFileName();
+ baseFileMapping.put(partitionPath, baseFileName);
+ baseFileToLogFileMapping.put(baseFileName, new ArrayList<>());
+ });
+
+ writeAndValidateLogFileBaseInstantTimeMatches(client, "002", records,
cfg, baseFileMapping, baseFileToLogFileMapping);
Review Comment:
can we somehow validate data as well? Maybe just pick a key to update and
ensure no data corruption.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]