This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 8cc067a10a9 [HUDI-8947] Fixing log file naming with MOR table writes
using table version 6 (#12746)
8cc067a10a9 is described below
commit 8cc067a10a94e79875b80fcbdf15119cf3e6fb1e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Jan 31 08:23:42 2025 -0800
[HUDI-8947] Fixing log file naming with MOR table writes using table
version 6 (#12746)
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../org/apache/hudi/io/HoodieAppendHandle.java | 12 +-
.../hudi/io/HoodieMergeHandleWithChangeLog.java | 4 +-
.../java/org/apache/hudi/io/HoodieWriteHandle.java | 53 +++--
.../providers/HoodieMetaClientProvider.java | 3 +
.../FlinkMergeAndReplaceHandleWithChangeLog.java | 2 +-
.../hudi/io/FlinkMergeHandleWithChangeLog.java | 2 +-
.../hudi/testutils/FunctionalTestHarness.java | 11 +-
.../SparkClientFunctionalTestHarness.java | 35 ++-
.../common/table/log/HoodieLogFormatWriter.java | 3 +
.../hudi/table/TestHoodieMergeOnReadTable.java | 119 ++++++++++
.../apache/hudi/testutils/DataSourceTestUtils.java | 11 +
.../apache/hudi/functional/TestMORDataSource.scala | 240 +++++++++++++--------
.../hudi/dml/TestMergeModeCommitTimeOrdering.scala | 3 +-
13 files changed, 374 insertions(+), 124 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 107e0b62ef6..038963168ce 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -178,11 +178,12 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
.map(HoodieBaseFile::getCommitTime);
}
- private void populateWriteStat(HoodieRecord record, HoodieDeltaWriteStat
deltaWriteStat) {
+ private Option<FileSlice> populateWriteStatAndFetchFileSlice(HoodieRecord
record, HoodieDeltaWriteStat deltaWriteStat) {
HoodieTableVersion tableVersion = hoodieTable.version();
String prevCommit;
String baseFile = "";
List<String> logFiles = new ArrayList<>();
+ Option<FileSlice> fileSlice = Option.empty();
if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
// table versions 8 and greater.
@@ -190,7 +191,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
// the cdc reader needs the base file metadata to have deterministic
update sequence.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
- Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
+ fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
if (fileSlice.isPresent()) {
prevCommit = fileSlice.get().getBaseInstantTime();
baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
@@ -200,7 +201,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
} else {
// older table versions.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
- Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
+ fileSlice = rtView.getLatestFileSlice(partitionPath, fileId);
if (fileSlice.isPresent()) {
prevCommit = fileSlice.get().getBaseInstantTime();
baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
@@ -218,6 +219,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
deltaWriteStat.setPrevCommit(prevCommit);
deltaWriteStat.setBaseFile(baseFile);
deltaWriteStat.setLogFiles(logFiles);
+ return fileSlice;
}
private void init(HoodieRecord record) {
@@ -231,7 +233,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
writeStatus.setPartitionPath(partitionPath);
deltaWriteStat.setPartitionPath(partitionPath);
deltaWriteStat.setFileId(fileId);
- populateWriteStat(record, deltaWriteStat);
+ Option<FileSlice> fileSliceOpt =
populateWriteStatAndFetchFileSlice(record, deltaWriteStat);
averageRecordSize = sizeEstimator.sizeEstimate(record);
try {
// Save hoodie partition meta in the partition path
@@ -243,7 +245,7 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
String instantTime =
config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)
? getInstantTimeForLogFile(record) : deltaWriteStat.getPrevCommit();
- this.writer = createLogWriter(instantTime);
+ this.writer = createLogWriter(instantTime, fileSliceOpt);
} catch (Exception e) {
LOG.error("Error in update task at commit " + instantTime, e);
writeStatus.setGlobalError(e);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
index 08be71288f9..2f1b629096f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandleWithChangeLog.java
@@ -62,7 +62,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O>
extends HoodieMergeHandl
partitionPath,
storage,
getWriterSchema(),
- createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+ createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX,
Option.empty()),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
@@ -80,7 +80,7 @@ public class HoodieMergeHandleWithChangeLog<T, I, K, O>
extends HoodieMergeHandl
partitionPath,
storage,
getWriterSchema(),
- createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+ createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX,
Option.empty()),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 73f05789083..0bda8706ec3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
@@ -30,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.LogFileCreationCallback;
import org.apache.hudi.common.util.HoodieTimer;
@@ -234,25 +236,44 @@ public abstract class HoodieWriteHandle<T, I, K, O>
extends HoodieIOHandle<T, I,
return new Schema.Parser().parse(config.getWriteSchema());
}
- protected HoodieLogFormat.Writer createLogWriter(String instantTime) {
- return createLogWriter(instantTime, null);
+ protected HoodieLogFormat.Writer createLogWriter(String instantTime,
Option<FileSlice> fileSliceOpt) {
+ return createLogWriter(instantTime, null, fileSliceOpt);
}
- protected HoodieLogFormat.Writer createLogWriter(String instantTime, String
fileSuffix) {
+ protected HoodieLogFormat.Writer createLogWriter(String instantTime, String
fileSuffix, Option<FileSlice> fileSliceOpt) {
try {
- return HoodieLogFormat.newWriterBuilder()
-
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
- .withFileId(fileId)
- .withInstantTime(instantTime)
- .withFileSize(0L)
- .withSizeThreshold(config.getLogFileMaxSize())
- .withStorage(storage)
- .withLogWriteToken(writeToken)
- .withFileCreationCallback(getLogCreationCallback())
- .withTableVersion(config.getWriteVersion())
- .withSuffix(fileSuffix)
- .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
- .build();
+ if
(config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ return HoodieLogFormat.newWriterBuilder()
+
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
+ .withFileId(fileId)
+ .withInstantTime(instantTime)
+ .withFileSize(0L)
+ .withSizeThreshold(config.getLogFileMaxSize())
+ .withStorage(storage)
+ .withLogWriteToken(writeToken)
+ .withFileCreationCallback(getLogCreationCallback())
+ .withTableVersion(config.getWriteVersion())
+ .withSuffix(fileSuffix)
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .build();
+ } else {
+ Option<HoodieLogFile> latestLogFile = fileSliceOpt.isPresent()
+ ? fileSliceOpt.get().getLatestLogFile()
+ : Option.empty();
+ return HoodieLogFormat.newWriterBuilder()
+
.onParentPath(FSUtils.constructAbsolutePath(hoodieTable.getMetaClient().getBasePath(),
partitionPath))
+ .withFileId(fileId)
+ .withInstantTime(instantTime)
+
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
+
.withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
+ .withSizeThreshold(config.getLogFileMaxSize())
+ .withStorage(storage)
+
.withLogWriteToken(latestLogFile.map(HoodieLogFile::getLogWriteToken).orElse(writeToken))
+ .withSuffix(fileSuffix)
+ .withFileCreationCallback(getLogCreationCallback())
+ .withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+ .build();
+ }
} catch (IOException e) {
throw new HoodieException("Creating logger writer with fileId: " +
fileId + ", "
+ "delta commit time: " + instantTime + ", "
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
index 23f0da3ce83..dec51f6b849 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/providers/HoodieMetaClientProvider.java
@@ -19,6 +19,7 @@
package org.apache.hudi.testutils.providers;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
@@ -36,6 +37,8 @@ public interface HoodieMetaClientProvider {
HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?>
storageConf, String basePath, Properties props) throws IOException;
+ HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?>
storageConf, String basePath, Properties props, HoodieTableType tableType)
throws IOException;
+
default HoodieTableFileSystemView getHoodieTableFileSystemView(
HoodieTableMetaClient metaClient, HoodieTimeline visibleActiveTimeline,
List<StoragePathInfo> pathInfoList) {
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
index c99e801b6ba..acb4ec224ba 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeAndReplaceHandleWithChangeLog.java
@@ -64,7 +64,7 @@ public class FlinkMergeAndReplaceHandleWithChangeLog<T, I, K,
O>
partitionPath,
getStorage(),
getWriterSchema(),
- createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+ createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX,
Option.empty()),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
index 0483096ce9a..d04b3b9717e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkMergeHandleWithChangeLog.java
@@ -62,7 +62,7 @@ public class FlinkMergeHandleWithChangeLog<T, I, K, O>
partitionPath,
getStorage(),
getWriterSchema(),
- createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX),
+ createLogWriter(instantTime, HoodieCDCUtils.CDC_LOGFILE_SUFFIX,
Option.empty()),
IOUtils.getMaxMemoryPerPartitionMerge(taskContextSupplier, config));
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
index 79272b20aea..20c70a33aa8 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.testutils.minicluster.HdfsTestService;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -122,15 +123,21 @@ public class FunctionalTestHarness implements
SparkProvider, DFSProvider, Hoodie
}
@Override
- public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?>
storageConf, String basePath, Properties props) throws IOException {
+ public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?>
storageConf, String basePath, Properties props,
+ HoodieTableType tableType) throws IOException {
return HoodieTableMetaClient.newTableBuilder()
.setTableName(RAW_TRIPS_TEST_NAME)
- .setTableType(COPY_ON_WRITE)
+ .setTableType(tableType)
.setPayloadClass(HoodieAvroPayload.class)
.fromProperties(props)
.initTable(storageConf.newInstance(), basePath);
}
+ @Override
+ public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?>
storageConf, String basePath, Properties props) throws IOException {
+ return getHoodieMetaClient(storageConf, basePath, props, COPY_ON_WRITE);
+ }
+
@Override
public SparkRDDWriteClient getHoodieWriteClient(HoodieWriteConfig cfg)
throws IOException {
return new SparkRDDWriteClient(context(), cfg);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index a24710ce201..aba8573073c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -79,10 +79,12 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -192,9 +194,15 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
@Override
public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?>
storageConf, String basePath, Properties props) throws IOException {
+ return getHoodieMetaClient(storageConf, basePath, props, COPY_ON_WRITE);
+ }
+
+ @Override
+ public HoodieTableMetaClient getHoodieMetaClient(StorageConfiguration<?>
storageConf, String basePath, Properties props,
+ HoodieTableType tableType)
throws IOException {
return HoodieTableMetaClient.newTableBuilder()
.setTableName(RAW_TRIPS_TEST_NAME)
- .setTableType(COPY_ON_WRITE)
+ .setTableType(tableType)
.setPayloadClass(HoodieAvroPayload.class)
.setTableVersion(ConfigUtils.getIntWithAltKeys(new
TypedProperties(props), WRITE_TABLE_VERSION))
.fromProperties(props)
@@ -430,4 +438,29 @@ public class SparkClientFunctionalTestHarness implements
SparkProvider, HoodieMe
timelineServicePort = (timelineServicePort + 1 - 1024) % (65536 - 1024) +
1024;
return timelineServicePort;
}
+
+ /**
+ * Check if two dataframes are equal.
+ *
+ * @param expectedDf expected dataframe
+ * @param actualDf actual dataframe
+ * @param validateColumns columns to validate
+ * @return true if dataframes are equal, false otherwise
+ */
+ public static boolean areDataframesEqual(Dataset<Row> expectedDf,
Dataset<Row> actualDf, Set<String> validateColumns) {
+ // Normalize schema order
+ String[] sortedColumnNames = Arrays.stream(expectedDf.columns())
+ .filter(validateColumns::contains).sorted().toArray(String[]::new);
+
+ // Reorder columns in both datasets
+ Dataset<Row> df1Normalized = expectedDf.selectExpr(sortedColumnNames);
+ Dataset<Row> df2Normalized = actualDf.selectExpr(sortedColumnNames);
+
+ // Sort rows
+ Dataset<Row> df1Sorted = df1Normalized.sort("_row_key");
+ Dataset<Row> df2Sorted = df2Normalized.sort("_row_key");
+
+ // Check for differences
+ return df1Sorted.except(df2Sorted).isEmpty() &&
df2Sorted.except(df1Sorted).isEmpty();
+ }
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
index 5c4c141947d..e2c75aa9367 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java
@@ -102,6 +102,9 @@ public class HoodieLogFormatWriter implements
HoodieLogFormat.Writer {
boolean created = false;
while (!created) {
try {
+ if (storage.exists(logFile.getPath())) {
+ rollOver();
+ }
// Block size does not matter as we will always manually auto-flush
createNewFile();
LOG.info("Created a new log file: {}", logFile);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index 34841ba2059..006a1bd80c9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -24,9 +24,11 @@ import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
@@ -63,6 +65,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -74,6 +77,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -85,6 +89,8 @@ import java.util.stream.Stream;
import static
org.apache.hudi.common.table.timeline.InstantComparison.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.InstantComparison.compareTimestamps;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings;
+import static org.apache.hudi.config.HoodieWriteConfig.WRITE_TABLE_VERSION;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static
org.apache.hudi.testutils.HoodieSparkClientTestHarness.buildProfile;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -222,6 +228,119 @@ public class TestHoodieMergeOnReadTable extends
SparkClientFunctionalTestHarness
}
}
+ @Test
+ public void testUpsertPartitionerWithTableVersionSix() throws Exception {
+ HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder(true);
+ addConfigsForPopulateMetaFields(cfgBuilder, true);
+ cfgBuilder.withWriteTableVersion(6);
+ HoodieWriteConfig cfg = cfgBuilder.build();
+
+ // create meta client w/ the table version 6
+ Properties props = getPropertiesForKeyGen(true);
+ props.put(WRITE_TABLE_VERSION.key(), "6");
+ metaClient = getHoodieMetaClient(storageConf(), basePath(), props,
HoodieTableType.MERGE_ON_READ);
+ dataGen = new HoodieTestDataGenerator();
+
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg);) {
+ // batch 1 insert
+ String newCommitTime = "001";
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 20);
+ JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
+ List<WriteStatus> statuses = client.upsert(writeRecords,
newCommitTime).collect();
+ assertNoWriteErrors(statuses);
+
+ HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(),
metaClient);
+
+ Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().firstInstant();
+ assertTrue(deltaCommit.isPresent());
+ assertEquals("001", deltaCommit.get().requestedTime(), "Delta commit
should be 001");
+
+ Option<HoodieInstant> commit =
metaClient.getActiveTimeline().getCommitAndReplaceTimeline().firstInstant();
+ assertFalse(commit.isPresent());
+
+ List<StoragePathInfo> allFiles = listAllBaseFilesInPath(hoodieTable);
+ BaseFileOnlyView roView = getHoodieTableFileSystemView(metaClient,
+ metaClient.getCommitsTimeline().filterCompletedInstants(), allFiles);
+
+ Map<String, String> baseFileMapping = new HashMap<>();
+ Map<String, List<String>> baseFileToLogFileMapping = new HashMap<>();
+ BaseFileOnlyView finalRoView = roView;
+
Arrays.stream(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS).forEach(partitionPath
-> {
+ String baseFileName =
finalRoView.getLatestBaseFiles(partitionPath).collect(Collectors.toList()).get(0).getFileName();
+ baseFileMapping.put(partitionPath, baseFileName);
+ baseFileToLogFileMapping.put(baseFileName, new ArrayList<>());
+ });
+
+ writeAndValidateLogFileBaseInstantTimeMatches(client, "002", records,
cfg, baseFileMapping, baseFileToLogFileMapping);
+ writeAndValidateLogFileBaseInstantTimeMatches(client, "003", records,
cfg, baseFileMapping, baseFileToLogFileMapping);
+ writeAndValidateLogFileBaseInstantTimeMatches(client, "004", records,
cfg, baseFileMapping, baseFileToLogFileMapping);
+ }
+ }
+
+ private void
writeAndValidateLogFileBaseInstantTimeMatches(SparkRDDWriteClient client,
String newCommitTime, List<HoodieRecord> records,
+ HoodieWriteConfig
cfg, Map<String, String> baseFileMapping,
+ Map<String,
List<String>> baseFileToLogFileMapping) throws IOException {
+ client.startCommitWithTime(newCommitTime);
+ List<HoodieRecord> newRecords = dataGen.generateUpdates(newCommitTime,
records);
+ List<WriteStatus> statuses = client.upsert(jsc().parallelize(newRecords),
newCommitTime).collect();
+ // validate the data itself
+ validateNewData(newRecords);
+ assertNoWriteErrors(statuses);
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ Option<HoodieInstant> deltaCommit =
metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
+ assertTrue(deltaCommit.isPresent());
+ assertEquals(newCommitTime, deltaCommit.get().requestedTime(), "Latest
Delta commit should be 002");
+
+ HoodieTable hoodieTable = HoodieSparkTable.create(cfg, context(),
metaClient);
+ HoodieTable finalHoodieTable = hoodieTable;
+ baseFileMapping.entrySet().forEach(entry -> {
+ FileSlice fileSlice =
finalHoodieTable.getSliceView().getLatestFileSlices(entry.getKey()).collect(Collectors.toList()).get(0);
+ String baseFileName = entry.getValue();
+ String baseInstantTime = FSUtils.getCommitTime(baseFileName);
+ // validate the base instant time matches
+ List<HoodieLogFile> logFiles =
fileSlice.getLogFiles().collect(Collectors.toList());
+ // except latest log file, all other files should be present in the
tracking map.
+ int counter = 0;
+ while (counter < logFiles.size()) {
+ HoodieLogFile logFile = logFiles.get(counter);
+ if (counter == logFiles.size() - 1) {
+ // latest log file may not be present in the tracking map. lets
add it to assist w/ for next round of validation.
+
baseFileToLogFileMapping.get(baseFileName).add(logFile.getFileName());
+ } else {
+ // all previous log files are expected to be matching
+
baseFileToLogFileMapping.get(baseFileName).contains(logFile.getFileName());
+ }
+ // validate that base instant time matches
+ assertEquals(baseInstantTime,
FSUtils.getDeltaCommitTimeFromLogPath(logFile.getPath()));
+ counter++;
+ }
+ }
+ );
+ }
+
+ private void validateNewData(List<HoodieRecord> newRecords) {
+ Dataset<Row> inputDf =
spark().read().json(jsc().parallelize(recordsToStrings(newRecords),
2)).drop("partition");
+ // get keys from the dataframe
+ List<String> updatedKeys =
inputDf.select("_row_key").as(Encoders.STRING()).collectAsList();
+ Dataset<Row> outputDf = spark().read().format("hudi").load(basePath());
+ // drop metadata columns
+ outputDf = outputDf.drop(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD,
+ HoodieRecord.FILENAME_METADATA_FIELD,
HoodieRecord.COMMIT_TIME_METADATA_FIELD,
HoodieRecord.COMMIT_SEQNO_METADATA_FIELD);
+ // filter the dataframe for updatedKeys only
+ outputDf =
outputDf.filter(outputDf.col("_row_key").isin(updatedKeys.toArray()));
+ // assert that the dataframe is equal to the expected dataframe
+ // NOTE: we have excluded some columns from comparison such as map, date
and array type fields as they were incompatible
+ // For example below is what data generated looks like vs what is read
from the table (check `city_to_state` map: [CA] vs Map(LA -> CA))
+ //
[false,029c1e56-3c03-42e3-a2eb-a45addd5b671,0.5550830309956531,0.013823731501093062,[CA],15,1322460250,1053705246,driver-002,0.8563083971473885,0.7050871729430999,
+ // [39.649862113946796,USD], WrappedArray(0, 0, 8, 19,
-72),Canada,2015/03/17,rider-002,-5190452608208752867,0,WrappedArray([88.29247239885966,USD]),BLACK,0.7458226]
+ //
[false,029c1e56-3c03-42e3-a2eb-a45addd5b671,0.5550830309956531,0.013823731501093062,Map(LA
->
CA),1970-01-16,1322460250,1053705246,driver-002,0.8563083971473885,0.7050871729430999,
+ //
[39.649862113946796,USD],0.529336,[B@372d7420,2015/03/17,rider-002,-5190452608208752867,0,WrappedArray([88.29247239885966,USD]),BLACK,0.7458226]
+ assertTrue(areDataframesEqual(inputDf, outputDf, new
HashSet<>(Arrays.asList("_hoodie_is_deleted", "_row_key", "begin_lat",
"begin_lon",
+ "current_ts", "distance_in_meters", "driver", "end_lat", "end_lon",
"fare"))), "Dataframe mismatch");
+ }
+
// TODO: Enable metadata virtual keys in this test once the feature
HUDI-2593 is completed
@Test
public void testLogFileCountsAfterCompaction() throws Exception {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index 7c1b596643c..08a82284898 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -172,9 +172,20 @@ public class DataSourceTestUtils {
.getLatestCompletionTime().orElse(null);
}
+ public static String latestCommitRequestTime(HoodieStorage storage, String
basePath) {
+ return HoodieDataSourceHelpers.allCompletedCommitsCompactions(storage,
basePath)
+ .lastInstant().map(instant -> instant.requestedTime()).orElse(null);
+ }
+
public static String latestDeltaCommitCompletionTime(HoodieStorage storage,
String basePath) {
return HoodieDataSourceHelpers.allCompletedCommitsCompactions(storage,
basePath)
.filter(instant ->
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()))
.getLatestCompletionTime().orElse(null);
}
+
+ public static String latestDeltaCommitRequest(HoodieStorage storage, String
basePath) {
+ return HoodieDataSourceHelpers.allCompletedCommitsCompactions(storage,
basePath)
+ .filter(instant ->
HoodieTimeline.DELTA_COMMIT_ACTION.equals(instant.getAction()))
+ .lastInstant().map(instant -> instant.requestedTime()).orElse(null);
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
index 3de88e30284..30a32bdb814 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMORDataSource.scala
@@ -25,7 +25,7 @@ import org.apache.hudi.common.config.{HoodieMemoryConfig,
HoodieMetadataConfig,
import
org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT,
TIMESTAMP_OUTPUT_DATE_FORMAT, TIMESTAMP_TIMEZONE_FORMAT, TIMESTAMP_TYPE_FIELD}
import org.apache.hudi.common.model._
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
-import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
import org.apache.hudi.common.testutils.{HoodieTestDataGenerator,
HoodieTestUtils}
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.Option
@@ -33,7 +33,7 @@ import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
import org.apache.hudi.functional.TestCOWDataSource.convertColumnsToNullable
import org.apache.hudi.index.HoodieIndex.IndexType
-import
org.apache.hudi.metadata.HoodieTableMetadataUtil.{metadataPartitionExists,
PARTITION_NAME_SECONDARY_INDEX_PREFIX}
+import
org.apache.hudi.metadata.HoodieTableMetadataUtil.{PARTITION_NAME_SECONDARY_INDEX_PREFIX,
metadataPartitionExists}
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy
import org.apache.hudi.testutils.{DataSourceTestUtils,
HoodieSparkClientTestBase}
@@ -99,19 +99,22 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
@ParameterizedTest
@CsvSource(Array(
// Inferred as COMMIT_TIME_ORDERING
- "AVRO, AVRO, avro, false,,", "AVRO, SPARK, parquet, false,,",
- "SPARK, AVRO, parquet, false,,", "SPARK, SPARK, parquet, false,,",
+ "AVRO, AVRO, avro, false,,,EIGHT", "AVRO, SPARK, parquet, false,,,EIGHT",
+ "SPARK, AVRO, parquet, false,,,EIGHT", "SPARK, SPARK, parquet,
false,,,EIGHT",
// EVENT_TIME_ORDERING without precombine field
- "AVRO, AVRO, avro, false,,EVENT_TIME_ORDERING", "AVRO, SPARK, parquet,
false,,EVENT_TIME_ORDERING",
- "SPARK, AVRO, parquet, false,,EVENT_TIME_ORDERING", "SPARK, SPARK,
parquet, false,,EVENT_TIME_ORDERING",
+ "AVRO, AVRO, avro, false,,EVENT_TIME_ORDERING,EIGHT", "AVRO, SPARK,
parquet, false,,EVENT_TIME_ORDERING,EIGHT",
+ "SPARK, AVRO, parquet, false,,EVENT_TIME_ORDERING,EIGHT", "SPARK, SPARK,
parquet, false,,EVENT_TIME_ORDERING,EIGHT",
// EVENT_TIME_ORDERING with empty precombine field
- "AVRO, AVRO, avro, true,,EVENT_TIME_ORDERING", "AVRO, SPARK, parquet,
true,,EVENT_TIME_ORDERING",
- "SPARK, AVRO, parquet, true,,EVENT_TIME_ORDERING", "SPARK, SPARK, parquet,
true,,EVENT_TIME_ORDERING",
+ "AVRO, AVRO, avro, true,,EVENT_TIME_ORDERING,EIGHT", "AVRO, SPARK,
parquet, true,,EVENT_TIME_ORDERING,EIGHT",
+ "SPARK, AVRO, parquet, true,,EVENT_TIME_ORDERING,EIGHT", "SPARK, SPARK,
parquet, true,,EVENT_TIME_ORDERING,EIGHT",
// Inferred as EVENT_TIME_ORDERING
- "AVRO, AVRO, avro, true, timestamp,", "AVRO, SPARK, parquet, true,
timestamp,",
- "SPARK, AVRO, parquet, true, timestamp,", "SPARK, SPARK, parquet, true,
timestamp,"))
+ "AVRO, AVRO, avro, true, timestamp,,EIGHT", "AVRO, SPARK, parquet, true,
timestamp,,EIGHT",
+ "SPARK, AVRO, parquet, true, timestamp,,EIGHT", "SPARK, SPARK, parquet,
true, timestamp,,EIGHT",
+ // test table version 6
+ "AVRO, AVRO, avro, true,timestamp,EVENT_TIME_ORDERING,SIX",
+ "AVRO, AVRO, avro, true,timestamp,COMMIT_TIME_ORDERING,SIX"))
def testCount(readType: HoodieRecordType, writeType: HoodieRecordType,
logType: String,
- hasPreCombineField: Boolean, precombineField: String,
recordMergeMode: String) {
+ hasPreCombineField: Boolean, precombineField: String,
recordMergeMode: String, tableVersion: String): Unit = {
var (_, readOpts) = getWriterReaderOpts(readType)
var (writeOpts, _) = getWriterReaderOpts(writeType)
readOpts = readOpts ++
Map(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> logType)
@@ -123,14 +126,24 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
writeOpts = writeOpts ++ Map(DataSourceWriteOptions.PRECOMBINE_FIELD.key
->
(if (isNullOrEmpty(precombineField)) "" else precombineField))
}
- val firstWriteOpts = if (isNullOrEmpty(recordMergeMode)) {
- writeOpts
- } else {
- writeOpts ++ Map(HoodieWriteConfig.RECORD_MERGE_MODE.key ->
recordMergeMode)
+ if (!isNullOrEmpty(recordMergeMode)) {
+ writeOpts = writeOpts ++ Map(HoodieWriteConfig.RECORD_MERGE_MODE.key ->
recordMergeMode)
}
if (isNullOrEmpty(recordMergeMode)) {
assertFalse(writeOpts.contains(HoodieWriteConfig.RECORD_MERGE_MODE.key))
}
+ val firstWriteOpts = if (tableVersion.equals("SIX")) {
+ writeOpts = writeOpts ++ Map(HoodieWriteConfig.WRITE_TABLE_VERSION.key()
-> HoodieTableVersion.SIX.versionCode().toString)
+ writeOpts = writeOpts - HoodieWriteConfig.RECORD_MERGE_MODE.key
+ if (recordMergeMode.equals(RecordMergeMode.COMMIT_TIME_ORDERING.name)) {
+ writeOpts = writeOpts ++
Map(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() ->
classOf[OverwriteWithLatestAvroPayload].getName)
+ } else if
(recordMergeMode.equals(RecordMergeMode.EVENT_TIME_ORDERING.name)) {
+ writeOpts = writeOpts ++
Map(DataSourceWriteOptions.PAYLOAD_CLASS_NAME.key() ->
classOf[DefaultHoodieRecordPayload].getName)
+ }
+ writeOpts
+ } else {
+ writeOpts
+ }
// First Operation:
// Producing parquet files to three default partitions.
@@ -155,7 +168,13 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
} else {
recordMergeMode
}
- val expectedConfigs = (Map(HoodieTableConfig.RECORD_MERGE_MODE.key ->
expectedMergeMode) ++
+ val expectedConfigs = (
+ if (tableVersion.equals("EIGHT")) {
+ Map(HoodieTableConfig.RECORD_MERGE_MODE.key -> expectedMergeMode,
+ HoodieTableConfig.VERSION.key ->
HoodieTableVersion.EIGHT.versionCode().toString)
+ } else {
+ Map(HoodieTableConfig.VERSION.key ->
HoodieTableVersion.SIX.versionCode().toString)
+ } ++
(if (hasPreCombineField && !isNullOrEmpty(precombineField)) {
Map(HoodieTableConfig.PRECOMBINE_FIELD.key -> precombineField)
} else {
@@ -167,7 +186,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
Seq(HoodieTableConfig.PRECOMBINE_FIELD.key)
}).asJava
HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs,
nonExistentConfigs)
- val commit1CompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ val commit1CompletionTime = if (tableVersion.equals("EIGHT")) {
+ DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ } else {
+ DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+ }
val hudiSnapshotDF1 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -185,7 +208,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.mode(SaveMode.Append)
.save(basePath)
HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs,
nonExistentConfigs)
- val commit2CompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ val commit2CompletionTime = if (tableVersion.equals("EIGHT")) {
+ DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ } else {
+ DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+ }
val hudiSnapshotDF2 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -198,49 +225,54 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
assertEquals(100, hudiSnapshotDF2.join(hudiSnapshotDF1,
Seq("_hoodie_record_key"), "left").count())
// incremental view
- // base file only
- val hudiIncDF1 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
- .option(DataSourceReadOptions.END_COMMIT.key, commit1CompletionTime)
- .load(basePath)
- assertEquals(100, hudiIncDF1.count())
- assertEquals(1,
hudiIncDF1.select("_hoodie_commit_time").distinct().count())
- assertEquals(commit1Time,
hudiIncDF1.select("_hoodie_commit_time").head().get(0).toString)
- hudiIncDF1.show(1)
- // log file only
- val hudiIncDF2 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
- .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
- .load(basePath)
- assertEquals(100, hudiIncDF2.count())
- assertEquals(1,
hudiIncDF2.select("_hoodie_commit_time").distinct().count())
- assertEquals(commit2Time,
hudiIncDF2.select("_hoodie_commit_time").head().get(0).toString)
- hudiIncDF2.show(1)
-
- // base file + log file
- val hudiIncDF3 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
- .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
- .load(basePath)
- assertEquals(100, hudiIncDF3.count())
- // log file being load
- assertEquals(1,
hudiIncDF3.select("_hoodie_commit_time").distinct().count())
- assertEquals(commit2Time,
hudiIncDF3.select("_hoodie_commit_time").head().get(0).toString)
-
- // Test incremental query has no instant in range
- val emptyIncDF = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, "000")
- .option(DataSourceReadOptions.END_COMMIT.key, "001")
- .load(basePath)
- assertEquals(0, emptyIncDF.count())
+ // validate incremental queries only for table version 8
+ // 1.0 reader (table version 8) supports incremental query reads using
completion time
+ if (tableVersion.equals("EIGHT")) {
+ // base file only
+ val hudiIncDF1 = spark.read.format("org.apache.hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+ .option(DataSourceReadOptions.END_COMMIT.key, commit1CompletionTime)
+ .load(basePath)
+ assertEquals(100, hudiIncDF1.count())
+ assertEquals(1,
hudiIncDF1.select("_hoodie_commit_time").distinct().count())
+ assertEquals(commit1Time,
hudiIncDF1.select("_hoodie_commit_time").head().get(0).toString)
+ hudiIncDF1.show(1)
+
+ // log file only
+ val hudiIncDF2 = spark.read.format("org.apache.hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit2CompletionTime)
+ .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
+ .load(basePath)
+ assertEquals(100, hudiIncDF2.count())
+ assertEquals(1,
hudiIncDF2.select("_hoodie_commit_time").distinct().count())
+ assertEquals(commit2Time,
hudiIncDF2.select("_hoodie_commit_time").head().get(0).toString)
+ hudiIncDF2.show(1)
+
+ // base file + log file
+ val hudiIncDF3 = spark.read.format("org.apache.hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+ .option(DataSourceReadOptions.END_COMMIT.key, commit2CompletionTime)
+ .load(basePath)
+ assertEquals(100, hudiIncDF3.count())
+ // log file being load
+ assertEquals(1,
hudiIncDF3.select("_hoodie_commit_time").distinct().count())
+ assertEquals(commit2Time,
hudiIncDF3.select("_hoodie_commit_time").head().get(0).toString)
+
+ // Test incremental query has no instant in range
+ val emptyIncDF = spark.read.format("org.apache.hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key, "000")
+ .option(DataSourceReadOptions.END_COMMIT.key, "001")
+ .load(basePath)
+ assertEquals(0, emptyIncDF.count())
+ }
// Unmerge
val hudiSnapshotSkipMergeDF2 = spark.read.format("org.apache.hudi")
@@ -269,7 +301,11 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.mode(SaveMode.Append)
.save(basePath)
HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs,
nonExistentConfigs)
- val commit3CompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ val commit3CompletionTime = if (tableVersion.equals("EIGHT")) {
+ DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ } else {
+ DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+ }
val hudiSnapshotDF3 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
@@ -284,22 +320,26 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
hudiSnapshotDF3.join(hudiSnapshotDF2, Seq("_hoodie_record_key",
"_hoodie_commit_time"), "inner").count())
// incremental query from commit2Time
- val hudiIncDF4 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
- .load(basePath)
- assertEquals(50, hudiIncDF4.count())
-
- // skip merge incremental view
- // including commit 2 and commit 3
- val hudiIncDF4SkipMerge = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
- .option(DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
- .load(basePath)
- assertEquals(250, hudiIncDF4SkipMerge.count())
+ // validate incremental queries only for table version 8
+ // 1.0 reader (table version 8) supports incremental query reads using
completion time
+ if (tableVersion.equals("EIGHT")) {
+ val hudiIncDF4 = spark.read.format("org.apache.hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+ .load(basePath)
+ assertEquals(50, hudiIncDF4.count())
+
+ // skip merge incremental view
+ // including commit 2 and commit 3
+ val hudiIncDF4SkipMerge = spark.read.format("org.apache.hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit1CompletionTime)
+ .option(DataSourceReadOptions.REALTIME_MERGE.key,
DataSourceReadOptions.REALTIME_SKIP_MERGE_OPT_VAL)
+ .load(basePath)
+ assertEquals(250, hudiIncDF4SkipMerge.count())
+ }
// Fourth Operation:
// Insert records to a new partition. Produced a new parquet file.
@@ -324,12 +364,16 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
hudiSnapshotDF1.join(hudiSnapshotDF4, Seq("_hoodie_record_key"),
"inner").count())
// Incremental query, 50 from log file, 100 from base file of the new
partition.
- val hudiIncDF5 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
- .load(basePath)
- assertEquals(150, hudiIncDF5.count())
+ // validate incremental queries only for table version 8
+ // 1.0 reader (table version 8) supports incremental query reads using
completion time
+ if (tableVersion.equals("EIGHT")) {
+ val hudiIncDF5 = spark.read.format("org.apache.hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit3CompletionTime)
+ .load(basePath)
+ assertEquals(150, hudiIncDF5.count())
+ }
// Fifth Operation:
// Upsert records to the new partition. Produced a newer version of
parquet file.
@@ -358,21 +402,29 @@ class TestMORDataSource extends HoodieSparkClientTestBase
with SparkDatasetMixin
.mode(SaveMode.Append)
.save(basePath)
HoodieTestUtils.validateTableConfig(storage, basePath, expectedConfigs,
nonExistentConfigs)
- val commit6CompletionTime =
DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ val commit6CompletionTime = if (tableVersion.equals("EIGHT")) {
+ DataSourceTestUtils.latestCommitCompletionTime(storage, basePath)
+ } else {
+ DataSourceTestUtils.latestCommitRequestTime(storage, basePath)
+ }
val hudiSnapshotDF6 = spark.read.format("org.apache.hudi")
.options(readOpts)
.option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL)
.load(basePath + "/2020/01/10/*")
assertEquals(102, hudiSnapshotDF6.count())
- val hudiIncDF6 = spark.read.format("org.apache.hudi")
- .options(readOpts)
- .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
- .option(DataSourceReadOptions.START_COMMIT.key, commit6CompletionTime)
- .option(DataSourceReadOptions.END_COMMIT.key, commit6CompletionTime)
- .load(basePath)
- // even though compaction updated 150 rows, since preserve commit metadata
is true, they won't be part of incremental query.
- // inserted 2 new row
- assertEquals(2, hudiIncDF6.count())
+ // validate incremental queries only for table version 8
+ // 1.0 reader (table version 8) supports incremental query reads using
completion time
+ if (tableVersion.equals("EIGHT")) {
+ val hudiIncDF6 = spark.read.format("org.apache.hudi")
+ .options(readOpts)
+ .option(DataSourceReadOptions.QUERY_TYPE.key,
DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
+ .option(DataSourceReadOptions.START_COMMIT.key, commit6CompletionTime)
+ .option(DataSourceReadOptions.END_COMMIT.key, commit6CompletionTime)
+ .load(basePath)
+ // even though compaction updated 150 rows, since preserve commit
metadata is true, they won't be part of incremental query.
+ // inserted 2 new row
+ assertEquals(2, hudiIncDF6.count())
+ }
}
@Test
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
index 67e7d992b19..5161e622a7c 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestMergeModeCommitTimeOrdering.scala
@@ -28,12 +28,11 @@ import
org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.validateTableConf
class TestMergeModeCommitTimeOrdering extends HoodieSparkSqlTestBase {
- // TODO(HUDI-8938): add "mor,true,true,6" after the fix
Seq(
"cow,8,false,false", "cow,8,false,true", "cow,8,true,false",
"cow,6,true,false", "cow,6,true,true",
"mor,8,false,false", "mor,8,false,true", "mor,8,true,false",
- "mor,6,true,false").foreach { args =>
+ "mor,6,true,true").foreach { args =>
val argList = args.split(',')
val tableType = argList(0)
val tableVersion = argList(1)