This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6eca06d  [HUDI-2105] Compaction Failed For MergeInto MOR Table (#3190)
6eca06d is described below

commit 6eca06d074520140d7bc67b48bd2b9a5b76f0a87
Author: pengzhiwei <[email protected]>
AuthorDate: Thu Jul 1 23:40:14 2021 +0800

    [HUDI-2105] Compaction Failed For MergeInto MOR Table (#3190)
---
 .../HoodieSparkMergeOnReadTableCompactor.java      | 14 +++++-
 .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 51 +++++++++++++++++++++-
 2 files changed, 63 insertions(+), 2 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 117be74..34018bb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
@@ -94,6 +95,18 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends 
HoodieRecordPayload>
       return jsc.emptyRDD();
     }
     HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+    TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+
+    // Here we firstly use the table schema as the reader schema to read
+    // log file.That is because in the case of MergeInto, the config.getSchema 
may not
+    // the same with the table schema.
+    try {
+      Schema readerSchema = schemaUtil.getTableAvroSchema(false);
+      config.setSchema(readerSchema.toString());
+    } catch (Exception e) {
+      // If there is no commit in the table, just ignore the exception.
+    }
+
     // Compacting is very similar to applying updates to existing file
     HoodieSparkCopyOnWriteTable table = new 
HoodieSparkCopyOnWriteTable(config, context, metaClient);
     List<CompactionOperation> operations = 
compactionPlan.getOperations().stream()
@@ -108,7 +121,6 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends 
HoodieRecordPayload>
   private List<WriteStatus> compact(HoodieSparkCopyOnWriteTable 
hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient,
       HoodieWriteConfig config, CompactionOperation operation, String 
instantTime) throws IOException {
     FileSystem fs = metaClient.getFs();
-
     Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(config.getSchema()));
     LOG.info("Compacting base " + operation.getDataFileName() + " with delta 
files " + operation.getDeltaFileNames()
         + " for commit " + instantTime);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 9b33fb3..28c47aa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -575,7 +575,6 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
         checkAnswer(s"select id, name, price from $tableName")(
           Seq(1, "a1", 10.0)
         )
-
         spark.sql(
           s"""
              | merge into $tableName
@@ -593,4 +592,54 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
       }
     }
   }
+
+  test("Test MergeInto For MOR With Compaction On") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '${tmp.getCanonicalPath}'
+           | options (
+           |  primaryKey ='id',
+           |  type = 'mor',
+           |  preCombineField = 'ts',
+           |  hoodie.compact.inline = 'true'
+           | )
+       """.stripMargin)
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+      spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+      spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+      checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+        Seq(1, "a1", 10.0, 1000),
+        Seq(2, "a2", 10.0, 1000),
+        Seq(3, "a3", 10.0, 1000),
+        Seq(4, "a4",10.0, 1000)
+      )
+
+      spark.sql(
+        s"""
+           |merge into $tableName h0
+           |using (
+           | select 4 as id, 'a4' as name, 11 as price, 1000 as ts
+           | ) s0
+           | on h0.id = s0.id
+           | when matched then update set *
+           |""".stripMargin)
+
+      // 5 commits will trigger compaction.
+      checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+        Seq(1, "a1", 10.0, 1000),
+        Seq(2, "a2", 10.0, 1000),
+        Seq(3, "a3", 10.0, 1000),
+        Seq(4, "a4", 11.0, 1000)
+      )
+    }
+  }
 }

Reply via email to