This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ee6a52474a [HUDI-6093] Use the correct partitionToReplacedFileIds 
during commit. (#8487)
6ee6a52474a is described below

commit 6ee6a52474a94c7d874dd81f84db881ac725cacc
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Jun 20 03:12:57 2023 -0700

    [HUDI-6093] Use the correct partitionToReplacedFileIds during commit. 
(#8487)
    
    Specify the correct partitionToReplacedFileIds
    while creating a commit in DeltaStreamer.
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../hudi/utilities/deltastreamer/DeltaSync.java    | 18 +++++++--
 .../deltastreamer/TestHoodieDeltaStreamer.java     | 47 ++++++++++++++++++----
 2 files changed, 53 insertions(+), 12 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index d56172ceba5..38582f0f8f4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -26,6 +26,7 @@ import org.apache.hudi.HoodieSparkSqlWriter;
 import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -792,6 +793,8 @@ public class DeltaSync implements Serializable, Closeable {
     instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
     LOG.info("Starting commit  : " + instantTime);
 
+    HoodieWriteResult writeResult;
+    Map<String, List<String>> partitionToReplacedFileIds = 
Collections.emptyMap();
     JavaRDD<WriteStatus> writeStatusRDD;
     switch (cfg.operation) {
       case INSERT:
@@ -804,14 +807,20 @@ public class DeltaSync implements Serializable, Closeable 
{
         writeStatusRDD = writeClient.bulkInsert(records, instantTime);
         break;
       case INSERT_OVERWRITE:
-        writeStatusRDD = writeClient.insertOverwrite(records, 
instantTime).getWriteStatuses();
+        writeResult = writeClient.insertOverwrite(records, instantTime);
+        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
+        writeStatusRDD = writeResult.getWriteStatuses();
         break;
       case INSERT_OVERWRITE_TABLE:
-        writeStatusRDD = writeClient.insertOverwriteTable(records, 
instantTime).getWriteStatuses();
+        writeResult = writeClient.insertOverwriteTable(records, instantTime);
+        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
+        writeStatusRDD = writeResult.getWriteStatuses();
         break;
       case DELETE_PARTITION:
         List<String> partitions = records.map(record -> 
record.getPartitionPath()).distinct().collect();
-        writeStatusRDD = writeClient.deletePartitions(partitions, 
instantTime).getWriteStatuses();
+        writeResult = writeClient.deletePartitions(partitions, instantTime);
+        partitionToReplacedFileIds = 
writeResult.getPartitionToReplaceFileIds();
+        writeStatusRDD = writeResult.getWriteStatuses();
         break;
       default:
         throw new HoodieDeltaStreamerException("Unknown operation : " + 
cfg.operation);
@@ -859,7 +868,8 @@ public class DeltaSync implements Serializable, Closeable {
           }
         }
       }
-      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, Collections.emptyMap(), 
extraPreCommitFunc);
+      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds,
+          extraPreCommitFunc);
       if (success) {
         LOG.info("Commit " + instantTime + " successful!");
         latestCheckpointWritten = checkpointStr;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 454b0c70db2..3b7ecf69510 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -47,6 +48,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
@@ -136,6 +138,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -2473,6 +2476,10 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     prepareParquetDFSSource(false, false, "source.avsc", "target.avsc",
         PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, 
"partition_path");
     String tableBasePath = basePath + "/test_parquet_table" + testNum;
+
+    // There should be fileIDs in the partition being deleted
+    assertFalse(getAllFileIDsInTable(tableBasePath, 
Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
+
     HoodieDeltaStreamer deltaStreamer = new HoodieDeltaStreamer(
         TestHelpers.makeConfig(tableBasePath, WriteOperationType.INSERT, 
ParquetDFSSource.class.getName(),
             null, PROPS_FILENAME_TEST_PARQUET, false,
@@ -2491,6 +2498,9 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     deltaStreamer.sync();
     // No records should match the 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION.
     TestHelpers.assertNoPartitionMatch(tableBasePath, sqlContext, 
HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH);
+
+    // There should not be any fileIDs in the deleted partition
+    assertTrue(getAllFileIDsInTable(tableBasePath, 
Option.of(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH)).isEmpty());
   }
 
   @Test
@@ -2519,19 +2529,34 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
 
+    // Collect the fileIds before running HoodieDeltaStreamer
+    Set<String> beforeFileIDs = getAllFileIDsInTable(tableBasePath, 
Option.empty());
+
     // setting the operationType
     cfg.operation = operationType;
     // No new data => no commits.
     cfg.sourceLimit = 0;
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
-    TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+
+    if (operationType == WriteOperationType.INSERT_OVERWRITE) {
+      TestHelpers.assertRecordCount(1000, tableBasePath, sqlContext);
+      TestHelpers.assertDistanceCount(1000, tableBasePath, sqlContext);
+      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+    } else if (operationType == WriteOperationType.INSERT_OVERWRITE_TABLE) {
+      HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
+      final HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
+      assertEquals(0, fsView.getLatestFileSlices("").count());
+      TestHelpers.assertCommitMetadata("00000", tableBasePath, fs, 1);
+
+      // Since the table has been overwritten all fileIDs before should have 
been replaced
+      Set<String> afterFileIDs = getAllFileIDsInTable(tableBasePath, 
Option.empty());
+      assertTrue(afterFileIDs.isEmpty());
+    }
 
     cfg.sourceLimit = 1000;
     new HoodieDeltaStreamer(cfg, jsc).sync();
-    TestHelpers.assertRecordCount(1950, tableBasePath, sqlContext);
-    TestHelpers.assertDistanceCount(1950, tableBasePath, sqlContext);
+    TestHelpers.assertRecordCount(950, tableBasePath, sqlContext);
+    TestHelpers.assertDistanceCount(950, tableBasePath, sqlContext);
     TestHelpers.assertCommitMetadata("00001", tableBasePath, fs, 2);
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
@@ -2716,11 +2741,18 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
+  private Set<String> getAllFileIDsInTable(String tableBasePath, 
Option<String> partition) {
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jsc.hadoopConfiguration()).setBasePath(tableBasePath).build();
+    final HoodieTableFileSystemView fsView = new 
HoodieTableFileSystemView(metaClient, 
metaClient.getCommitsAndCompactionTimeline());
+    Stream<HoodieBaseFile> baseFileStream = partition.isPresent() ? 
fsView.getLatestBaseFiles(partition.get()) : fsView.getLatestBaseFiles();
+    return 
baseFileStream.map(HoodieBaseFile::getFileId).collect(Collectors.toSet());
+  }
+
   class TestDeltaSync extends DeltaSync {
 
     public TestDeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession 
sparkSession, SchemaProvider schemaProvider, TypedProperties props,
-                         JavaSparkContext jssc, FileSystem fs, Configuration 
conf,
-                         Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
+        JavaSparkContext jssc, FileSystem fs, Configuration conf,
+        Function<SparkRDDWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
       super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, 
onInitializingHoodieWriteClient);
     }
 
@@ -2849,5 +2881,4 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
         arguments(true, 
Collections.singletonList(TripsWithDistanceTransformer.class.getName()))
     );
   }
-
 }

Reply via email to