This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6ee6a52474a [HUDI-6093] Use the correct partitionToReplacedFileIds
during commit. (#8487)
6ee6a52474a is described below
commit 6ee6a52474a94c7d874dd81f84db881ac725cacc
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Jun 20 03:12:57 2023 -0700
[HUDI-6093] Use the correct partitionToReplacedFileIds during commit.
(#8487)
Specify the correct partitionToReplacedFileIds
while creating a commit in DeltaStreamer.
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/utilities/deltastreamer/DeltaSync.java | 18 +++++++--
.../deltastreamer/TestHoodieDeltaStreamer.java | 47 ++++++++++++++++++----
2 files changed, 53 insertions(+), 12 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index d56172ceba5..38582f0f8f4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -26,6 +26,7 @@ import org.apache.hudi.HoodieSparkSqlWriter;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -792,6 +793,8 @@ public class DeltaSync implements Serializable, Closeable {
instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
LOG.info("Starting commit : " + instantTime);
+ HoodieWriteResult writeResult;
+ Map<String, List<String>> partitionToReplacedFileIds =
Collections.emptyMap();
JavaRDD<WriteStatus> writeStatusRDD;
switch (cfg.operation) {
case INSERT:
@@ -804,14 +807,20 @@ public class DeltaSync implements Serializable, Closeable
{
writeStatusRDD = writeClient.bulkInsert(records, instantTime);
break;
case INSERT_OVERWRITE:
- writeStatusRDD = writeClient.insertOverwrite(records,
instantTime).getWriteStatuses();
+ writeResult = writeClient.insertOverwrite(records, instantTime);
+ partitionToReplacedFileIds =
writeResult.getPartitionToReplaceFileIds();
+ writeStatusRDD = writeResult.getWriteStatuses();
break;
case INSERT_OVERWRITE_TABLE:
- writeStatusRDD = writeClient.insertOverwriteTable(records,
instantTime).getWriteStatuses();
+ writeResult = writeClient.insertOverwriteTable(records, instantTime);
+ partitionToReplacedFileIds =
writeResult.getPartitionToReplaceFileIds();
+ writeStatusRDD = writeResult.getWriteStatuses();
break;
case DELETE_PARTITION:
List<String> partitions = records.map(record ->
record.getPartitionPath()).distinct().collect();
- writeStatusRDD = writeClient.deletePartitions(partitions,
instantTime).getWriteStatuses();
+ writeResult = writeClient.deletePartitions(partitions, instantTime);
+ partitionToReplacedFileIds =
writeResult.getPartitionToReplaceFileIds();
+ writeStatusRDD = writeResult.getWriteStatuses();
break;
default:
throw new HoodieDeltaStreamerException("Unknown operation : " +
cfg.operation);
@@ -859,7 +868,8 @@ public class DeltaSync implements Serializable, Closeable {
}
}
}
- boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap(),
extraPreCommitFunc);
+ boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds,
+ extraPreCommitFunc);
if (success) {
LOG.info("Commit " + instantTime + " successful!");
latestCheckpointWritten = checkpointStr;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 454b0c70db2..3b7ecf69510 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
@@ -47,6 +48,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.Option;
@@ -136,6 +138,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -2473,6 +2476,10 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false,
"partition_path");
String tableBasePath = basePath + "/test_parquet_table" + testNum;
+
+ // There should be fileIDs in the partition being deleted
+ assertFalse(getAllFileIDsInTable(tableBasePath,
Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
+
HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT,
ParquetDFSSource.class.getName(),
null, PROPS_FILENAME_TEST_PARQUET, false,
@@ -2491,6 +2498,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
deltaStreamer.sync();
// No records should match the
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext,
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+
+ // There should not be any fileIDs in the deleted partition
+ assertTrue(getAllFileIDsInTable(tableBasePath,
Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
}
@Test
@@ -2519,19 +2529,34 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+ // Collect the fileIds before running HoodieDeltaStreamer
+ Set<String> beforeFileIDs = getAllFileIDsInTable(tableBasePath,
Option.empty());
+
// setting the operationType
cfg.operation = operationType;
// No new data => no commits.
cfg.sourceLimit = 0;
new HoodieDeltaStreamer(cfg, jsc).sync();
- TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
- TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+
+ if (operationType == WriteOperationType.INSERT_OVERWRITE) {
+ TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+ } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
+ final HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
+ assertEquals(0, fsView.getLatestFileSlices("").count());
+ TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+
+ // Since the table has been overwritten all fileIDs before should have
been replaced
+ Set<String> afterFileIDs = getAllFileIDsInTable(tableBasePath,
Option.empty());
+ assertTrue(afterFileIDs.isEmpty());
+ }
cfg.sourceLimit = 1000;
new HoodieDeltaStreamer(cfg, jsc).sync();
- TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
- TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
+ TestHelpers.assertRecordCount(950, tableBasePath, sqlContext);
+ TestHelpers.assertDistanceCount(950, tableBasePath, sqlContext);
TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
@@ -2716,11 +2741,18 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
+ private Set<String> getAllFileIDsInTable(String tableBasePath,
Option<String> partition) {
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
+ final HoodieTableFileSystemView fsView = new
HoodieTableFileSystemView(metaClient,
metaClient.getCommitsAndCompactionTimeline());
+ Stream<HoodieBaseFile> baseFileStream = partition.isPresent() ?
fsView.getLatestBaseFiles(partition.get()) : fsView.getLatestBaseFiles();
+ return
baseFileStream.map(HoodieBaseFile::getFileId).collect(Collectors.toSet());
+ }
+
class TestDeltaSync extends DeltaSync {
public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession
sparkSession, SchemaProvider schemaProvider, TypedProperties props,
- JavaSparkContext jssc, FileSystem fs, Configuration
conf,
- Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
+ JavaSparkContext jssc, FileSystem fs, Configuration conf,
+ Function<SparkRDDWriteClient, Boolean>
onInitializingHoodieWriteClient) throws IOException {
super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf,
onInitializingHoodieWriteClient);
}
@@ -2849,5 +2881,4 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
arguments(true,
Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
);
}
-
}