This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f7f5d4c  [HUDI-2184] Support setting hive sync partition extractor 
class based on flink configuration (#3284)
f7f5d4c is described below

commit f7f5d4cc6db1230e653346953d87c6ff480c56eb
Author: swuferhong <[email protected]>
AuthorDate: Fri Jul 30 17:24:00 2021 +0800

    [HUDI-2184] Support setting hive sync partition extractor class based on 
flink configuration (#3284)
---
 .../org/apache/hudi/table/HoodieTableFactory.java  | 13 ++++++
 .../apache/hudi/table/TestHoodieTableFactory.java  | 52 ++++++++++++++++++++++
 2 files changed, 65 insertions(+)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 753ced4..a8fe93b 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.util.AvroSchemaConverter;
@@ -148,6 +149,8 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
     setupHoodieKeyOptions(conf, table);
     // compaction options
     setupCompactionOptions(conf);
+    // hive options
+    setupHiveOptions(conf);
     // infer avro schema from physical DDL schema
     inferAvroSchema(conf, schema.toRowDataType().notNull().getLogicalType());
   }
@@ -208,6 +211,16 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   }
 
   /**
+   * Sets up the hive options from the table definition.
+   * */
+  private static void setupHiveOptions(Configuration conf) {
+    if (!conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING)
+        && FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS)) {
+      conf.setString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS, 
MultiPartKeysValueExtractor.class.getName());
+    }
+  }
+
+  /**
    * Inferences the deserialization Avro schema from the table schema (e.g. 
the DDL)
    * if both options {@link FlinkOptions#SOURCE_AVRO_SCHEMA_PATH} and
    * {@link FlinkOptions#SOURCE_AVRO_SCHEMA} are not specified.
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index e40741c..799739c 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.table;
 
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+import org.apache.hudi.hive.SlashEncodedDayPartitionValueExtractor;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.util.StreamerUtil;
@@ -173,6 +175,31 @@ public class TestHoodieTableFactory {
   }
 
   @Test
+  void testSetupHiveOptionsForSource() {
+    // definition with simple primary key and partition path
+    TableSchema schema1 = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+
+    final MockContext sourceContext1 = MockContext.getInstance(this.conf, 
schema1, "f2");
+    final HoodieTableSource tableSource1 = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext1);
+    final Configuration conf1 = tableSource1.getConf();
+    
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), 
is(MultiPartKeysValueExtractor.class.getName()));
+
+    // set up hive style partitioning is true.
+    this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
+
+    final MockContext sourceContext2 = MockContext.getInstance(this.conf, 
schema1, "f2");
+    final HoodieTableSource tableSource2 = (HoodieTableSource) new 
HoodieTableFactory().createDynamicTableSource(sourceContext2);
+    final Configuration conf2 = tableSource2.getConf();
+    
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), 
is(SlashEncodedDayPartitionValueExtractor.class.getName()));
+  }
+
+  @Test
   void testSetupCleaningOptionsForSource() {
     // definition with simple primary key and partition path
     TableSchema schema1 = TableSchema.builder()
@@ -260,6 +287,31 @@ public class TestHoodieTableFactory {
   }
 
   @Test
+  void testSetupHiveOptionsForSink() {
+    // definition with simple primary key and partition path
+    TableSchema schema1 = TableSchema.builder()
+        .field("f0", DataTypes.INT().notNull())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("f2", DataTypes.TIMESTAMP(3))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .primaryKey("f0")
+        .build();
+
+    final MockContext sinkContext1 = MockContext.getInstance(this.conf, 
schema1, "f2");
+    final HoodieTableSink tableSink1 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sinkContext1);
+    final Configuration conf1 = tableSink1.getConf();
+    
assertThat(conf1.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), 
is(MultiPartKeysValueExtractor.class.getName()));
+
+    // set up hive style partitioning is true.
+    this.conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, true);
+
+    final MockContext sinkContext2 = MockContext.getInstance(this.conf, 
schema1, "f2");
+    final HoodieTableSink tableSink2 = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sinkContext2);
+    final Configuration conf2 = tableSink2.getConf();
+    
assertThat(conf2.getString(FlinkOptions.HIVE_SYNC_PARTITION_EXTRACTOR_CLASS), 
is(SlashEncodedDayPartitionValueExtractor.class.getName()));
+  }
+
+  @Test
   void testSetupCleaningOptionsForSink() {
     // definition with simple primary key and partition path
     TableSchema schema1 = TableSchema.builder()

Reply via email to