This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 61051a3ef97 [HUDI-5031] Fix MERGE INTO creates empty partition files 
when source table has partitions but target table does not (#6983)
61051a3ef97 is described below

commit 61051a3ef971b2066e81fee80f34f3243d839127
Author: Ming Wei <[email protected]>
AuthorDate: Thu Oct 19 07:28:01 2023 +0800

    [HUDI-5031] Fix MERGE INTO creates empty partition files when source table 
has partitions but target table does not (#6983)
    
    * [HUDI-5031] Fix MERGE INTO creates empty partition files when source 
table has partitions but target table does not
    
    Co-authored-by: jameswei <[email protected]>
    Co-authored-by: balaji.varadarajan <[email protected]>
---
 .../hudi/execution/CopyOnWriteInsertHandler.java   | 19 ++++-
 .../hudi/execution/SparkLazyInsertIterable.java    |  3 -
 .../spark/sql/hudi/TestMergeIntoTable2.scala       | 81 ++++++++++++++++++++++
 3 files changed, 99 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
index fd932a66a0a..0191b8f9d3a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
@@ -27,7 +27,10 @@ import 
org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenRe
 import org.apache.hudi.io.HoodieWriteHandle;
 import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.table.HoodieTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -41,6 +44,8 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
 public class CopyOnWriteInsertHandler<T>
     implements HoodieConsumer<HoodieInsertValueGenResult<HoodieRecord>, 
List<WriteStatus>> {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(CopyOnWriteInsertHandler.class);
+
   private final HoodieWriteConfig config;
   private final String instantTime;
   private final boolean areRecordsSorted;
@@ -49,6 +54,9 @@ public class CopyOnWriteInsertHandler<T>
   private final TaskContextSupplier taskContextSupplier;
   private final WriteHandleFactory writeHandleFactory;
 
+  // Tracks number of skipped records seen by this instance
+  private int numSkippedRecords = 0;
+
   private final List<WriteStatus> statuses = new ArrayList<>();
   // Stores the open HoodieWriteHandle for each table partition path
   // If the records are consumed in order, there should be only one open 
handle in this mapping.
@@ -72,6 +80,15 @@ public class CopyOnWriteInsertHandler<T>
   public void consume(HoodieInsertValueGenResult<HoodieRecord> genResult) {
     final HoodieRecord record = genResult.getResult();
     String partitionPath = record.getPartitionPath();
+    // just skip the ignored record,do not make partitions on fs
+    try {
+      if (record.shouldIgnore(genResult.schema, config.getProps())) {
+        numSkippedRecords++;
+        return;
+      }
+    } catch (IOException e) {
+      LOG.warn("Writing record should be ignore " + record, e);
+    }
     HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
     if (handle == null) {
       // If the records are sorted, this means that we encounter a new 
partition path
@@ -100,7 +117,7 @@ public class CopyOnWriteInsertHandler<T>
   @Override
   public List<WriteStatus> finish() {
     closeOpenHandles();
-    checkState(statuses.size() > 0);
+    checkState(statuses.size() + numSkippedRecords > 0);
     return statuses;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index 3b42d40a1a2..1a0dcc09ffc 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -34,8 +34,6 @@ import org.apache.hudi.util.ExecutorFactory;
 import java.util.Iterator;
 import java.util.List;
 
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
-
 public class SparkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
 
   private final boolean useWriterSchema;
@@ -78,7 +76,6 @@ public class SparkLazyInsertIterable<T> extends 
HoodieLazyInsertIterable<T> {
           getTransformer(schema, hoodieConfig), 
hoodieTable.getPreExecuteRunnable());
 
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
-      checkState(result != null && !result.isEmpty());
       return result;
     } catch (Exception e) {
       throw new HoodieException(e);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index da8d3183f00..d5dcfd01ad1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -942,4 +942,85 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
       )
     }
   }
+
+  test("Test MOR Table with create empty partitions") {
+    withTempDir { tmp =>
+
+      val sourceTable = generateTableName
+      val path1 = tmp.getCanonicalPath.concat("/source")
+      spark.sql(
+        s"""
+           | create table $sourceTable (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  dt string
+           | ) using hudi
+           | tblproperties (
+           |  type = 'mor',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(dt)
+           | location '${path1}'
+         """.stripMargin)
+
+      spark.sql(s"insert into $sourceTable values(1, 'a1', cast(3.01 as 
double), 11, '2022-09-26'),(2, 'a2', cast(3.02 as double), 12, 
'2022-09-27'),(3, 'a3', cast(3.03 as double), 13, '2022-09-28'),(4, 'a4', 
cast(3.04 as double), 14, '2022-09-29')")
+
+      checkAnswer(s"select id, name, price, ts, dt from $sourceTable order by 
id")(
+        Seq(1, "a1", 3.01, 11,"2022-09-26"),
+        Seq(2, "a2", 3.02, 12,"2022-09-27"),
+        Seq(3, "a3", 3.03, 13,"2022-09-28"),
+        Seq(4, "a4", 3.04, 14,"2022-09-29")
+      )
+
+      val path2 = tmp.getCanonicalPath.concat("/target")
+      val destTable = generateTableName
+      spark.sql(
+        s"""
+           | create table $destTable (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long,
+           |  dt string
+           | ) using hudi
+           | tblproperties (
+           |  type = 'mor',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+           | partitioned by(dt)
+           | location '${path2}'
+         """.stripMargin)
+
+      spark.sql(s"insert into $destTable values(1, 'd1', cast(3.01 as double), 
11, '2022-09-26'),(2, 'd2', cast(3.02 as double), 12, '2022-09-26'),(3, 'd3', 
cast(3.03 as double), 13, '2022-09-26')")
+
+      checkAnswer(s"select id, name, price, ts, dt from $destTable order by 
id")(
+        Seq(1, "d1", 3.01, 11,"2022-09-26"),
+        Seq(2, "d2", 3.02, 12,"2022-09-26"),
+        Seq(3, "d3", 3.03, 13,"2022-09-26")
+      )
+
+      // merge operation
+      spark.sql(
+        s"""
+           |merge into $destTable h0
+           |using (
+           | select id, name, price, ts, dt from $sourceTable
+           | ) s0
+           | on h0.id = s0.id and h0.dt = s0.dt
+           | when matched then update set *
+           |""".stripMargin)
+
+      checkAnswer(s"select id, name, price, ts, dt from $destTable order by 
id")(
+        Seq(1, "a1", 3.01, 11,"2022-09-26"),
+        Seq(2, "d2", 3.02, 12,"2022-09-26"),
+        Seq(3, "d3", 3.03, 13,"2022-09-26")
+      )
+      // check partitions
+      checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
+    }
+  }
 }

Reply via email to