This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f6fc726d0 [HUDI-4140] Fixing hive style partitioning and default 
partition with bulk insert row writer with SimpleKeyGen and virtual keys (#5664)
4f6fc726d0 is described below

commit 4f6fc726d0d3d2dd427210228bbb36cf18893a92
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Mon Jun 6 13:21:00 2022 -0400

    [HUDI-4140] Fixing hive style partitioning and default partition with bulk 
insert row writer with SimpleKeyGen and virtual keys (#5664)
    
    Bulk insert row writer code path had a gap wrt hive style partitioning and 
default partition when virtual keys are enabled with SimpleKeyGen.  This patch 
fixes the issue.
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  4 ++
 .../BulkInsertDataInternalWriterHelper.java        |  7 +++-
 .../HoodieBulkInsertInternalWriterTestBase.java    |  5 +++
 .../TestHoodieBulkInsertDataInternalWriter.java    | 48 ++++++++++++++++++++++
 4 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 1603965ea9..31ce05173a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1064,6 +1064,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return MarkerType.valueOf(markerType.toUpperCase());
   }
 
+  public boolean isHiveStylePartitioningEnabled() {
+    return 
getBooleanOrDefault(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE);
+  }
+
   public int getMarkersTimelineServerBasedBatchNumThreads() {
     return getInt(MARKERS_TIMELINE_SERVER_BASED_BATCH_NUM_THREADS);
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
index 9a793c4227..c9404afe61 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/BulkInsertDataInternalWriterHelper.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.HoodieInternalWriteStatus;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.PartitionPathEncodeUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
@@ -128,7 +129,11 @@ public class BulkInsertDataInternalWriterHelper {
         if (!keyGeneratorOpt.isPresent()) { // NoPartitionerKeyGen
           partitionPath = "";
         } else if (simpleKeyGen) { // SimpleKeyGen
-          partitionPath = (record.get(simplePartitionFieldIndex, 
simplePartitionFieldDataType)).toString();
+          Object parititionPathValue = record.get(simplePartitionFieldIndex, 
simplePartitionFieldDataType);
+          partitionPath = parititionPathValue != null ? 
parititionPathValue.toString() : 
PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH;
+          if (writeConfig.isHiveStylePartitioningEnabled()) {
+            partitionPath = 
(keyGeneratorOpt.get()).getPartitionPathFields().get(0) + "=" + partitionPath;
+          }
         } else {
           // only BuiltIn key generators are supported if meta fields are 
disabled.
           partitionPath = keyGeneratorOpt.get().getPartitionPath(record, 
structType);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
index 95a023abb6..54eaadd1e3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/test/java/org/apache/hudi/internal/HoodieBulkInsertInternalWriterTestBase.java
@@ -70,6 +70,10 @@ public class HoodieBulkInsertInternalWriterTestBase extends 
HoodieClientTestHarn
   }
 
   protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields) {
+    return getWriteConfig(populateMetaFields, 
DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().defaultValue());
+  }
+
+  protected HoodieWriteConfig getWriteConfig(boolean populateMetaFields, 
String hiveStylePartitioningValue) {
     Properties properties = new Properties();
     if (!populateMetaFields) {
       
properties.setProperty(DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME().key(), 
SimpleKeyGenerator.class.getName());
@@ -77,6 +81,7 @@ public class HoodieBulkInsertInternalWriterTestBase extends 
HoodieClientTestHarn
       
properties.setProperty(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME);
       properties.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), 
"false");
     }
+    
properties.setProperty(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING().key(), 
hiveStylePartitioningValue);
     return getConfigBuilder(basePath, 
timelineServicePort).withProperties(properties).build();
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
index fd943b72e3..f31a344714 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -18,15 +18,18 @@
 
 package org.apache.hudi.internal;
 
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.SparkDatasetTestUtils;
 
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -109,6 +112,51 @@ public class TestHoodieBulkInsertDataInternalWriter extends
     }
   }
 
+  @Test
+  public void testDataInternalWriterHiveStylePartitioning() throws Exception {
+    boolean sorted = true;
+    boolean populateMetaFields = false;
+    // init config and table
+    HoodieWriteConfig cfg = getWriteConfig(populateMetaFields, "true");
+    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
+    for (int i = 0; i < 1; i++) {
+      String instantTime = "00" + i;
+      // init writer
+      HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
+          STRUCT_TYPE, populateMetaFields, sorted);
+
+      int size = 10 + RANDOM.nextInt(1000);
+      // write N rows to partition1, N rows to partition2 and N rows to 
partition3 ... Each batch should create a new RowCreateHandle and a new file
+      int batches = 3;
+      Dataset<Row> totalInputRows = null;
+
+      for (int j = 0; j < batches; j++) {
+        String partitionPath = 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
+        Dataset<Row> inputRows = getRandomRows(sqlContext, size, 
partitionPath, false);
+        writeRows(inputRows, writer);
+        if (totalInputRows == null) {
+          totalInputRows = inputRows;
+        } else {
+          totalInputRows = totalInputRows.union(inputRows);
+        }
+      }
+
+      BaseWriterCommitMessage commitMetadata = (BaseWriterCommitMessage) 
writer.commit();
+      Option<List<String>> fileAbsPaths = Option.of(new ArrayList<>());
+      Option<List<String>> fileNames = Option.of(new ArrayList<>());
+
+      // verify write statuses
+      assertWriteStatuses(commitMetadata.getWriteStatuses(), batches, size, 
sorted, fileAbsPaths, fileNames);
+
+      // verify rows
+      Dataset<Row> result = 
sqlContext.read().parquet(fileAbsPaths.get().toArray(new String[0]));
+      assertOutput(totalInputRows, result, instantTime, fileNames, 
populateMetaFields);
+
+      result.collectAsList().forEach(entry -> 
Assertions.assertTrue(entry.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD).toString()
+          .contains(SparkDatasetTestUtils.PARTITION_PATH_FIELD_NAME + "=")));
+    }
+  }
+
   /**
    * Issue some corrupted or wrong schematized InternalRow after few valid 
InternalRows so that global error is thrown. write batch 1 of valid records 
write batch2 of invalid records which is expected
    * to throw Global Error. Verify global error is set appropriately and only 
first batch of records are written to disk.

Reply via email to