This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a0f598d  [HUDI-2089]fix the bug that metatable cannot support 
non_partition table (#3182)
a0f598d is described below

commit a0f598d371938060a17aca7603c080c22488cf57
Author: xiarixiaoyao <[email protected]>
AuthorDate: Tue Jul 6 11:14:05 2021 +0800

    [HUDI-2089]fix the bug that metatable cannot support non_partition table 
(#3182)
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  7 ++--
 .../functional/HoodieSparkSqlWriterSuite.scala     | 46 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4f1d79b..b81e00b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -318,7 +318,7 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
             createInstantTime);
       }).forEach(status -> {
         HoodieWriteStat writeStat = new HoodieWriteStat();
-        writeStat.setPath(partition + Path.SEPARATOR + 
status.getPath().getName());
+        writeStat.setPath((partition.isEmpty() ? "" : partition + 
Path.SEPARATOR) + status.getPath().getName());
         writeStat.setPartitionPath(partition);
         writeStat.setTotalWriteBytes(status.getLen());
         commitMetadata.addWriteStat(partition, writeStat);
@@ -374,9 +374,10 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
             .collect(Collectors.toList());
 
         if (p.getRight().length > filesInDir.size()) {
-          // Is a partition. Add all data files to result.
           String partitionName = FSUtils.getRelativePartitionPath(new 
Path(datasetMetaClient.getBasePath()), p.getLeft());
-          partitionToFileStatus.put(partitionName, filesInDir);
+          // deal with Non-partition table, we should exclude .hoodie
+          partitionToFileStatus.put(partitionName, filesInDir.stream()
+              .filter(f -> 
!f.getPath().getName().equals(HoodieTableMetaClient.METAFOLDER_NAME)).collect(Collectors.toList()));
         } else {
           // Add sub-dirs to the queue
           pathsToList.addAll(Arrays.stream(p.getRight())
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
index 44be4d1..a06eeb1 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/HoodieSparkSqlWriterSuite.scala
@@ -700,4 +700,50 @@ class HoodieSparkSqlWriterSuite extends FunSuite with 
Matchers {
       }
     }
   }
+
+  test("test Non partition table with metatable support") {
+    List(DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL, 
DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL).foreach { tableType =>
+      initSparkContext("testNonPartitionTableWithMetaTable")
+      initSparkContext("test_schema_evolution")
+      val path = java.nio.file.Files.createTempDirectory("hoodie_test_path")
+      val basePath = path.toAbsolutePath.toString
+      try {
+        val df = spark.range(0, 10).toDF("keyid")
+          .withColumn("col3", expr("keyid"))
+          .withColumn("age", expr("keyid + 1000"))
+
+        df.write.format("hudi")
+          .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, tableType)
+          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key, "col3")
+          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key, "keyid")
+          .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key, "")
+          .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key, 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
+          .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "insert")
+          .option("hoodie.insert.shuffle.parallelism", "1")
+          .option("hoodie.metadata.enable", "true")
+          .option(HoodieWriteConfig.TABLE_NAME.key, "hoodie_test")
+          .mode(SaveMode.Overwrite).save(basePath)
+        // upsert same record again
+        val df_update = spark.range(0, 10).toDF("keyid")
+          .withColumn("col3", expr("keyid"))
+          .withColumn("age", expr("keyid + 2000"))
+        df_update.write.format("hudi")
+          .option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY.key, tableType)
+          .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY.key, "col3")
+          .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY.key, "keyid")
+          .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY.key, "")
+          .option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY.key, 
"org.apache.hudi.keygen.NonpartitionedKeyGenerator")
+          .option(DataSourceWriteOptions.OPERATION_OPT_KEY.key, "upsert")
+          .option("hoodie.upsert.shuffle.parallelism", "1")
+          .option("hoodie.metadata.enable", "true")
+          .option(HoodieWriteConfig.TABLE_NAME.key, "hoodie_test")
+          .mode(SaveMode.Append).save(basePath)
+        assert(spark.read.format("hudi").load(basePath).count() == 10)
+        assert(spark.read.format("hudi").load(basePath).where("age >= 
2000").count() == 10)
+      } finally {
+        spark.stop()
+        FileUtils.deleteDirectory(path.toFile)
+      }
+    }
+  }
 }

Reply via email to