This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 61051a3ef97 [HUDI-5031] Fix MERGE INTO creates empty partition files
when source table has partitions but target table does not (#6983)
61051a3ef97 is described below
commit 61051a3ef971b2066e81fee80f34f3243d839127
Author: Ming Wei <[email protected]>
AuthorDate: Thu Oct 19 07:28:01 2023 +0800
[HUDI-5031] Fix MERGE INTO creates empty partition files when source table
has partitions but target table does not (#6983)
* [HUDI-5031] Fix MERGE INTO creates empty partition files when source
table has partitions but target table does not
Co-authored-by: jameswei <[email protected]>
Co-authored-by: balaji.varadarajan <[email protected]>
---
.../hudi/execution/CopyOnWriteInsertHandler.java | 19 ++++-
.../hudi/execution/SparkLazyInsertIterable.java | 3 -
.../spark/sql/hudi/TestMergeIntoTable2.scala | 81 ++++++++++++++++++++++
3 files changed, 99 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
index fd932a66a0a..0191b8f9d3a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java
@@ -27,7 +27,10 @@ import
org.apache.hudi.execution.HoodieLazyInsertIterable.HoodieInsertValueGenRe
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -41,6 +44,8 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
public class CopyOnWriteInsertHandler<T>
implements HoodieConsumer<HoodieInsertValueGenResult<HoodieRecord>,
List<WriteStatus>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(CopyOnWriteInsertHandler.class);
+
private final HoodieWriteConfig config;
private final String instantTime;
private final boolean areRecordsSorted;
@@ -49,6 +54,9 @@ public class CopyOnWriteInsertHandler<T>
private final TaskContextSupplier taskContextSupplier;
private final WriteHandleFactory writeHandleFactory;
+ // Tracks number of skipped records seen by this instance
+ private int numSkippedRecords = 0;
+
private final List<WriteStatus> statuses = new ArrayList<>();
// Stores the open HoodieWriteHandle for each table partition path
// If the records are consumed in order, there should be only one open
handle in this mapping.
@@ -72,6 +80,15 @@ public class CopyOnWriteInsertHandler<T>
public void consume(HoodieInsertValueGenResult<HoodieRecord> genResult) {
final HoodieRecord record = genResult.getResult();
String partitionPath = record.getPartitionPath();
+ // just skip the ignored record,do not make partitions on fs
+ try {
+ if (record.shouldIgnore(genResult.schema, config.getProps())) {
+ numSkippedRecords++;
+ return;
+ }
+ } catch (IOException e) {
+ LOG.warn("Writing record should be ignore " + record, e);
+ }
HoodieWriteHandle<?,?,?,?> handle = handles.get(partitionPath);
if (handle == null) {
// If the records are sorted, this means that we encounter a new
partition path
@@ -100,7 +117,7 @@ public class CopyOnWriteInsertHandler<T>
@Override
public List<WriteStatus> finish() {
closeOpenHandles();
- checkState(statuses.size() > 0);
+ checkState(statuses.size() + numSkippedRecords > 0);
return statuses;
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index 3b42d40a1a2..1a0dcc09ffc 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -34,8 +34,6 @@ import org.apache.hudi.util.ExecutorFactory;
import java.util.Iterator;
import java.util.List;
-import static org.apache.hudi.common.util.ValidationUtils.checkState;
-
public class SparkLazyInsertIterable<T> extends HoodieLazyInsertIterable<T> {
private final boolean useWriterSchema;
@@ -78,7 +76,6 @@ public class SparkLazyInsertIterable<T> extends
HoodieLazyInsertIterable<T> {
getTransformer(schema, hoodieConfig),
hoodieTable.getPreExecuteRunnable());
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
- checkState(result != null && !result.isEmpty());
return result;
} catch (Exception e) {
throw new HoodieException(e);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
index da8d3183f00..d5dcfd01ad1 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable2.scala
@@ -942,4 +942,85 @@ class TestMergeIntoTable2 extends HoodieSparkSqlTestBase {
)
}
}
+
+ test("Test MOR Table with create empty partitions") {
+ withTempDir { tmp =>
+
+ val sourceTable = generateTableName
+ val path1 = tmp.getCanonicalPath.concat("/source")
+ spark.sql(
+ s"""
+ | create table $sourceTable (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ | ) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(dt)
+ | location '${path1}'
+ """.stripMargin)
+
+ spark.sql(s"insert into $sourceTable values(1, 'a1', cast(3.01 as
double), 11, '2022-09-26'),(2, 'a2', cast(3.02 as double), 12,
'2022-09-27'),(3, 'a3', cast(3.03 as double), 13, '2022-09-28'),(4, 'a4',
cast(3.04 as double), 14, '2022-09-29')")
+
+ checkAnswer(s"select id, name, price, ts, dt from $sourceTable order by
id")(
+ Seq(1, "a1", 3.01, 11,"2022-09-26"),
+ Seq(2, "a2", 3.02, 12,"2022-09-27"),
+ Seq(3, "a3", 3.03, 13,"2022-09-28"),
+ Seq(4, "a4", 3.04, 14,"2022-09-29")
+ )
+
+ val path2 = tmp.getCanonicalPath.concat("/target")
+ val destTable = generateTableName
+ spark.sql(
+ s"""
+ | create table $destTable (
+ | id int,
+ | name string,
+ | price double,
+ | ts long,
+ | dt string
+ | ) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ | partitioned by(dt)
+ | location '${path2}'
+ """.stripMargin)
+
+ spark.sql(s"insert into $destTable values(1, 'd1', cast(3.01 as double),
11, '2022-09-26'),(2, 'd2', cast(3.02 as double), 12, '2022-09-26'),(3, 'd3',
cast(3.03 as double), 13, '2022-09-26')")
+
+ checkAnswer(s"select id, name, price, ts, dt from $destTable order by
id")(
+ Seq(1, "d1", 3.01, 11,"2022-09-26"),
+ Seq(2, "d2", 3.02, 12,"2022-09-26"),
+ Seq(3, "d3", 3.03, 13,"2022-09-26")
+ )
+
+ // merge operation
+ spark.sql(
+ s"""
+ |merge into $destTable h0
+ |using (
+ | select id, name, price, ts, dt from $sourceTable
+ | ) s0
+ | on h0.id = s0.id and h0.dt = s0.dt
+ | when matched then update set *
+ |""".stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $destTable order by
id")(
+ Seq(1, "a1", 3.01, 11,"2022-09-26"),
+ Seq(2, "d2", 3.02, 12,"2022-09-26"),
+ Seq(3, "d3", 3.03, 13,"2022-09-26")
+ )
+ // check partitions
+ checkAnswer(s"show partitions $destTable")(Seq("dt=2022-09-26"))
+ }
+ }
}