This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6eca06d [HUDI-2105] Compaction Failed For MergeInto MOR Table (#3190)
6eca06d is described below
commit 6eca06d074520140d7bc67b48bd2b9a5b76f0a87
Author: pengzhiwei <[email protected]>
AuthorDate: Thu Jul 1 23:40:14 2021 +0800
[HUDI-2105] Compaction Failed For MergeInto MOR Table (#3190)
---
.../HoodieSparkMergeOnReadTableCompactor.java | 14 +++++-
.../apache/spark/sql/hudi/TestMergeIntoTable.scala | 51 +++++++++++++++++++++-
2 files changed, 63 insertions(+), 2 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
index 117be74..34018bb 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/compact/HoodieSparkMergeOnReadTableCompactor.java
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
@@ -94,6 +95,18 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends
HoodieRecordPayload>
return jsc.emptyRDD();
}
HoodieTableMetaClient metaClient = hoodieTable.getMetaClient();
+ TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
+
+ // Here we firstly use the table schema as the reader schema to read
+ // log file.That is because in the case of MergeInto, the config.getSchema
may not
+ // the same with the table schema.
+ try {
+ Schema readerSchema = schemaUtil.getTableAvroSchema(false);
+ config.setSchema(readerSchema.toString());
+ } catch (Exception e) {
+ // If there is no commit in the table, just ignore the exception.
+ }
+
// Compacting is very similar to applying updates to existing file
HoodieSparkCopyOnWriteTable table = new
HoodieSparkCopyOnWriteTable(config, context, metaClient);
List<CompactionOperation> operations =
compactionPlan.getOperations().stream()
@@ -108,7 +121,6 @@ public class HoodieSparkMergeOnReadTableCompactor<T extends
HoodieRecordPayload>
private List<WriteStatus> compact(HoodieSparkCopyOnWriteTable
hoodieCopyOnWriteTable, HoodieTableMetaClient metaClient,
HoodieWriteConfig config, CompactionOperation operation, String
instantTime) throws IOException {
FileSystem fs = metaClient.getFs();
-
Schema readerSchema = HoodieAvroUtils.addMetadataFields(new
Schema.Parser().parse(config.getSchema()));
LOG.info("Compacting base " + operation.getDataFileName() + " with delta
files " + operation.getDeltaFileNames()
+ " for commit " + instantTime);
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
index 9b33fb3..28c47aa 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala
@@ -575,7 +575,6 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
checkAnswer(s"select id, name, price from $tableName")(
Seq(1, "a1", 10.0)
)
-
spark.sql(
s"""
| merge into $tableName
@@ -593,4 +592,54 @@ class TestMergeIntoTable extends TestHoodieSqlBase {
}
}
}
+
+ test("Test MergeInto For MOR With Compaction On") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '${tmp.getCanonicalPath}'
+ | options (
+ | primaryKey ='id',
+ | type = 'mor',
+ | preCombineField = 'ts',
+ | hoodie.compact.inline = 'true'
+ | )
+ """.stripMargin)
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(2, 'a2', 10, 1000)")
+ spark.sql(s"insert into $tableName values(3, 'a3', 10, 1000)")
+ spark.sql(s"insert into $tableName values(4, 'a4', 10, 1000)")
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1000),
+ Seq(2, "a2", 10.0, 1000),
+ Seq(3, "a3", 10.0, 1000),
+ Seq(4, "a4",10.0, 1000)
+ )
+
+ spark.sql(
+ s"""
+ |merge into $tableName h0
+ |using (
+ | select 4 as id, 'a4' as name, 11 as price, 1000 as ts
+ | ) s0
+ | on h0.id = s0.id
+ | when matched then update set *
+ |""".stripMargin)
+
+ // 5 commits will trigger compaction.
+ checkAnswer(s"select id, name, price, ts from $tableName order by id")(
+ Seq(1, "a1", 10.0, 1000),
+ Seq(2, "a2", 10.0, 1000),
+ Seq(3, "a3", 10.0, 1000),
+ Seq(4, "a4", 11.0, 1000)
+ )
+ }
+ }
}