This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b7ccecf3205 [HUDI-7492] Fix the incorrect keygenerator specification 
for multi partition or multi primary key tables creation (#10840)
b7ccecf3205 is described below

commit b7ccecf32051e1f880826ee21a4b97cfd3a06f87
Author: empcl <[email protected]>
AuthorDate: Mon Mar 18 16:27:09 2024 +0800

    [HUDI-7492] Fix the incorrect keygenerator specification for multi 
partition or multi primary key tables creation (#10840)
---
 .../org/apache/hudi/table/HoodieTableFactory.java  |  7 +---
 .../apache/hudi/table/catalog/HoodieCatalog.java   |  4 ++
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  3 ++
 .../java/org/apache/hudi/util/StreamerUtil.java    | 12 ++++++
 .../hudi/table/catalog/TestHoodieCatalog.java      | 43 +++++++++++++++++++++
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  | 45 ++++++++++++++++++++++
 6 files changed, 108 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 68642b39da8..65f0199ae80 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -28,7 +28,6 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
 import org.apache.hudi.util.AvroSchemaConverter;
@@ -318,11 +317,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       }
     }
     boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
-    if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.KEYGEN_CLASS_NAME)) {
-      conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, 
ComplexAvroKeyGenerator.class.getName());
-      LOG.info("Table option [{}] is reset to {} because record key or 
partition path has two or more fields",
-          FlinkOptions.KEYGEN_CLASS_NAME.key(), 
ComplexAvroKeyGenerator.class.getName());
-    }
+    StreamerUtil.checkKeygenGenerator(complexHoodieKey, conf);
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index d25db7d82fa..f9088b4096c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -343,6 +343,10 @@ public class HoodieCatalog extends AbstractCatalog {
       final String partitions = String.join(",", 
resolvedTable.getPartitionKeys());
       conf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
       options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
+
+      final String[] pks = 
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+      boolean complexHoodieKey = pks.length > 1 || 
resolvedTable.getPartitionKeys().size() > 1;
+      StreamerUtil.checkKeygenGenerator(complexHoodieKey, conf);
     } else {
       conf.setString(FlinkOptions.KEYGEN_CLASS_NAME.key(), 
NonpartitionedAvroKeyGenerator.class.getName());
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 3e409d11f5d..ce0230e6939 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -502,6 +502,9 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     if (catalogTable.isPartitioned() && 
!flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
       final String partitions = String.join(",", 
catalogTable.getPartitionKeys());
       flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
+      final String[] pks = 
flinkConf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+      boolean complexHoodieKey = pks.length > 1 || 
catalogTable.getPartitionKeys().size() > 1;
+      StreamerUtil.checkKeygenGenerator(complexHoodieKey, flinkConf);
     }
 
     if (!catalogTable.isPartitioned()) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index e98242d4df3..2b8656f2148 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -48,6 +48,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
 import org.apache.hudi.sink.transform.ChainedTransformer;
@@ -534,4 +535,15 @@ public class StreamerUtil {
       }
     }
   }
+
+  /**
+   * Validate keygen generator.
+   */
+  public static void checkKeygenGenerator(boolean isComplexHoodieKey, 
Configuration conf) {
+    if (isComplexHoodieKey && FlinkOptions.isDefaultValueDefined(conf, 
FlinkOptions.KEYGEN_CLASS_NAME)) {
+      conf.setString(FlinkOptions.KEYGEN_CLASS_NAME, 
ComplexAvroKeyGenerator.class.getName());
+      LOG.info("Table option [{}] is reset to {} because record key or 
partition path has two or more fields",
+          FlinkOptions.KEYGEN_CLASS_NAME.key(), 
ComplexAvroKeyGenerator.class.getName());
+    }
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 0207022903b..d883b72b075 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
@@ -35,6 +36,7 @@ import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.DataTypes;
@@ -108,6 +110,13 @@ public class TestHoodieCatalog {
           Collections.emptyList(),
           CONSTRAINTS);
 
+  private static final UniqueConstraint MULTI_KEY_CONSTRAINTS = 
UniqueConstraint.primaryKey("uuid", Arrays.asList("uuid", "name"));
+  private static final ResolvedSchema CREATE_MULTI_KEY_TABLE_SCHEMA =
+      new ResolvedSchema(
+          CREATE_COLUMNS,
+          Collections.emptyList(),
+          MULTI_KEY_CONSTRAINTS);
+
   private static final List<Column> EXPECTED_TABLE_COLUMNS =
       CREATE_COLUMNS.stream()
           .map(
@@ -258,6 +267,40 @@ public class TestHoodieCatalog {
     String keyGeneratorClassName = 
metaClient.getTableConfig().getKeyGeneratorClassName();
     assertEquals(keyGeneratorClassName, 
SimpleAvroKeyGenerator.class.getName());
 
+    // validate single key and multiple partition for partitioned table
+    ObjectPath singleKeyMultiplePartitionPath = new 
ObjectPath(TEST_DEFAULT_DATABASE, "tb_skmp" + System.currentTimeMillis());
+    final ResolvedCatalogTable singleKeyMultiplePartitionTable = new 
ResolvedCatalogTable(
+        CatalogTable.of(
+            
Schema.newBuilder().fromResolvedSchema(CREATE_TABLE_SCHEMA).build(),
+            "test",
+            Lists.newArrayList("par1", "par2"),
+            EXPECTED_OPTIONS),
+        CREATE_TABLE_SCHEMA
+    );
+
+    catalog.createTable(singleKeyMultiplePartitionPath, 
singleKeyMultiplePartitionTable, false);
+    metaClient =
+        StreamerUtil.createMetaClient(catalog.inferTablePath(catalogPathStr, 
singleKeyMultiplePartitionPath), new org.apache.hadoop.conf.Configuration());
+    keyGeneratorClassName = 
metaClient.getTableConfig().getKeyGeneratorClassName();
+    assertThat(keyGeneratorClassName, 
is(ComplexAvroKeyGenerator.class.getName()));
+
+    // validate multiple key and single partition for partitioned table
+    ObjectPath multipleKeySinglePartitionPath = new 
ObjectPath(TEST_DEFAULT_DATABASE, "tb_mksp" + System.currentTimeMillis());
+    final ResolvedCatalogTable multipleKeySinglePartitionTable = new 
ResolvedCatalogTable(
+        CatalogTable.of(
+            
Schema.newBuilder().fromResolvedSchema(CREATE_MULTI_KEY_TABLE_SCHEMA).build(),
+            "test",
+            Lists.newArrayList("par1"),
+            EXPECTED_OPTIONS),
+        CREATE_TABLE_SCHEMA
+    );
+
+    catalog.createTable(multipleKeySinglePartitionPath, 
multipleKeySinglePartitionTable, false);
+    metaClient =
+        StreamerUtil.createMetaClient(catalog.inferTablePath(catalogPathStr, 
singleKeyMultiplePartitionPath), new org.apache.hadoop.conf.Configuration());
+    keyGeneratorClassName = 
metaClient.getTableConfig().getKeyGeneratorClassName();
+    assertThat(keyGeneratorClassName, 
is(ComplexAvroKeyGenerator.class.getName()));
+
     // validate key generator for non partitioned table
     ObjectPath nonPartitionPath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb");
     final ResolvedCatalogTable nonPartitionCatalogTable = new 
ResolvedCatalogTable(
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 45fc3d6f386..d88bb0326ef 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -29,11 +29,13 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieCatalogException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableSchema;
@@ -71,6 +73,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.hudi.configuration.FlinkOptions.PRECOMBINE_FIELD;
+import static 
org.apache.hudi.keygen.constant.KeyGeneratorOptions.RECORDKEY_FIELD_NAME;
 import static 
org.apache.hudi.table.catalog.HoodieCatalogTestUtils.createHiveConf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -97,6 +100,26 @@ public class TestHoodieHiveCatalog {
           .primaryKey("uuid")
           .build();
   List<String> partitions = Collections.singletonList("par1");
+
+  TableSchema multiKeySinglePartitionTableSchema =
+      TableSchema.builder()
+          .field("uuid", DataTypes.INT().notNull())
+          .field("name", DataTypes.STRING().notNull())
+          .field("age", DataTypes.INT())
+          .field("par1", DataTypes.STRING())
+          .primaryKey("uuid", "name")
+          .build();
+
+  TableSchema singleKeyMultiPartitionTableSchema =
+      TableSchema.builder()
+          .field("uuid", DataTypes.INT().notNull())
+          .field("name", DataTypes.STRING())
+          .field("par1", DataTypes.STRING())
+          .field("par2", DataTypes.STRING())
+          .primaryKey("uuid")
+          .build();
+  List<String> multiPartitions = Lists.newArrayList("par1", "par2");
+
   private static HoodieHiveCatalog hoodieCatalog;
   private final ObjectPath tablePath = new ObjectPath("default", "test");
 
@@ -201,6 +224,28 @@ public class TestHoodieHiveCatalog {
     String keyGeneratorClassName = 
metaClient.getTableConfig().getKeyGeneratorClassName();
     assertEquals(keyGeneratorClassName, 
SimpleAvroKeyGenerator.class.getName());
 
+    // validate single key and multiple partition for partitioned table
+    ObjectPath singleKeyMultiPartitionPath = new ObjectPath("default", 
"tb_skmp_" + System.currentTimeMillis());
+    CatalogTable singleKeyMultiPartitionTable =
+        new CatalogTableImpl(singleKeyMultiPartitionTableSchema, 
multiPartitions, options, "hudi table");
+    hoodieCatalog.createTable(singleKeyMultiPartitionPath, 
singleKeyMultiPartitionTable, false);
+
+    HoodieTableMetaClient singleKeyMultiPartitionTableMetaClient =
+        
StreamerUtil.createMetaClient(hoodieCatalog.inferTablePath(singleKeyMultiPartitionPath,
 singleKeyMultiPartitionTable), createHiveConf());
+    
assertThat(singleKeyMultiPartitionTableMetaClient.getTableConfig().getKeyGeneratorClassName(),
 is(ComplexAvroKeyGenerator.class.getName()));
+
+    // validate multiple key and single partition for partitioned table
+    ObjectPath multiKeySinglePartitionPath = new ObjectPath("default", 
"tb_mksp_" + System.currentTimeMillis());
+
+    options.remove(RECORDKEY_FIELD_NAME.key());
+    CatalogTable multiKeySinglePartitionTable =
+        new CatalogTableImpl(multiKeySinglePartitionTableSchema, partitions, 
options, "hudi table");
+    hoodieCatalog.createTable(multiKeySinglePartitionPath, 
multiKeySinglePartitionTable, false);
+
+    HoodieTableMetaClient multiKeySinglePartitionTableMetaClient =
+        
StreamerUtil.createMetaClient(hoodieCatalog.inferTablePath(multiKeySinglePartitionPath,
 multiKeySinglePartitionTable), createHiveConf());
+    
assertThat(multiKeySinglePartitionTableMetaClient.getTableConfig().getKeyGeneratorClassName(),
 is(ComplexAvroKeyGenerator.class.getName()));
+
     // validate key generator for non partitioned table
     ObjectPath nonPartitionPath = new ObjectPath("default", "tb_" + tableType);
     CatalogTable nonPartitionTable =

Reply via email to