This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b7ccecf3205 [HUDI-7492] Fix the incorrect keygenerator specification
for multi partition or multi primary key tables creation (#10840)
b7ccecf3205 is described below
commit b7ccecf32051e1f880826ee21a4b97cfd3a06f87
Author: empcl <[email protected]>
AuthorDate: Mon Mar 18 16:27:09 2024 +0800
[HUDI-7492] Fix the incorrect keygenerator specification for multi
partition or multi primary key tables creation (#10840)
---
.../org/apache/hudi/table/HoodieTableFactory.java | 7 +---
.../apache/hudi/table/catalog/HoodieCatalog.java | 4 ++
.../hudi/table/catalog/HoodieHiveCatalog.java | 3 ++
.../java/org/apache/hudi/util/StreamerUtil.java | 12 ++++++
.../hudi/table/catalog/TestHoodieCatalog.java | 43 +++++++++++++++++++++
.../hudi/table/catalog/TestHoodieHiveCatalog.java | 45 ++++++++++++++++++++++
6 files changed, 108 insertions(+), 6 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 68642b39da8..65f0199ae80 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -28,7 +28,6 @@ import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.index.HoodieIndex;
-import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -318,11 +317,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
}
}
boolean complexHoodieKey = pks.length > 1 || partitions.length > 1;
- if (complexHoodieKey && FlinkOptions.isDefaultValueDefined(conf,
FlinkOptions.KEYGEN_CLASS_NAME)) {
- conf.setString(FlinkOptions.KEYGEN_CLASS_NAME,
ComplexAvroKeyGenerator.class.getName());
- LOG.info("Table option [{}] is reset to {} because record key or
partition path has two or more fields",
- FlinkOptions.KEYGEN_CLASS_NAME.key(),
ComplexAvroKeyGenerator.class.getName());
- }
+ StreamerUtil.checkKeygenGenerator(complexHoodieKey, conf);
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index d25db7d82fa..f9088b4096c 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -343,6 +343,10 @@ public class HoodieCatalog extends AbstractCatalog {
final String partitions = String.join(",",
resolvedTable.getPartitionKeys());
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
+
+ final String[] pks =
conf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ boolean complexHoodieKey = pks.length > 1 ||
resolvedTable.getPartitionKeys().size() > 1;
+ StreamerUtil.checkKeygenGenerator(complexHoodieKey, conf);
} else {
conf.setString(FlinkOptions.KEYGEN_CLASS_NAME.key(),
NonpartitionedAvroKeyGenerator.class.getName());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 3e409d11f5d..ce0230e6939 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -502,6 +502,9 @@ public class HoodieHiveCatalog extends AbstractCatalog {
if (catalogTable.isPartitioned() &&
!flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
final String partitions = String.join(",",
catalogTable.getPartitionKeys());
flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
+ final String[] pks =
flinkConf.getString(FlinkOptions.RECORD_KEY_FIELD).split(",");
+ boolean complexHoodieKey = pks.length > 1 ||
catalogTable.getPartitionKeys().size() > 1;
+ StreamerUtil.checkKeygenGenerator(complexHoodieKey, flinkConf);
}
if (!catalogTable.isPartitioned()) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index e98242d4df3..2b8656f2148 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -48,6 +48,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.schema.FilebasedSchemaProvider;
import org.apache.hudi.sink.transform.ChainedTransformer;
@@ -534,4 +535,15 @@ public class StreamerUtil {
}
}
}
+
+ /**
+ * Validate keygen generator.
+ */
+ public static void checkKeygenGenerator(boolean isComplexHoodieKey,
Configuration conf) {
+ if (isComplexHoodieKey && FlinkOptions.isDefaultValueDefined(conf,
FlinkOptions.KEYGEN_CLASS_NAME)) {
+ conf.setString(FlinkOptions.KEYGEN_CLASS_NAME,
ComplexAvroKeyGenerator.class.getName());
+ LOG.info("Table option [{}] is reset to {} because record key or
partition path has two or more fields",
+ FlinkOptions.KEYGEN_CLASS_NAME.key(),
ComplexAvroKeyGenerator.class.getName());
+ }
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 0207022903b..d883b72b075 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
@@ -35,6 +36,7 @@ import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.DataTypes;
@@ -108,6 +110,13 @@ public class TestHoodieCatalog {
Collections.emptyList(),
CONSTRAINTS);
+ private static final UniqueConstraint MULTI_KEY_CONSTRAINTS =
UniqueConstraint.primaryKey("uuid", Arrays.asList("uuid", "name"));
+ private static final ResolvedSchema CREATE_MULTI_KEY_TABLE_SCHEMA =
+ new ResolvedSchema(
+ CREATE_COLUMNS,
+ Collections.emptyList(),
+ MULTI_KEY_CONSTRAINTS);
+
private static final List<Column> EXPECTED_TABLE_COLUMNS =
CREATE_COLUMNS.stream()
.map(
@@ -258,6 +267,40 @@ public class TestHoodieCatalog {
String keyGeneratorClassName =
metaClient.getTableConfig().getKeyGeneratorClassName();
assertEquals(keyGeneratorClassName,
SimpleAvroKeyGenerator.class.getName());
+ // validate single key and multiple partition for partitioned table
+ ObjectPath singleKeyMultiplePartitionPath = new
ObjectPath(TEST_DEFAULT_DATABASE, "tb_skmp" + System.currentTimeMillis());
+ final ResolvedCatalogTable singleKeyMultiplePartitionTable = new
ResolvedCatalogTable(
+ CatalogTable.of(
+
Schema.newBuilder().fromResolvedSchema(CREATE_TABLE_SCHEMA).build(),
+ "test",
+ Lists.newArrayList("par1", "par2"),
+ EXPECTED_OPTIONS),
+ CREATE_TABLE_SCHEMA
+ );
+
+ catalog.createTable(singleKeyMultiplePartitionPath,
singleKeyMultiplePartitionTable, false);
+ metaClient =
+ StreamerUtil.createMetaClient(catalog.inferTablePath(catalogPathStr,
singleKeyMultiplePartitionPath), new org.apache.hadoop.conf.Configuration());
+ keyGeneratorClassName =
metaClient.getTableConfig().getKeyGeneratorClassName();
+ assertThat(keyGeneratorClassName,
is(ComplexAvroKeyGenerator.class.getName()));
+
+ // validate multiple key and single partition for partitioned table
+ ObjectPath multipleKeySinglePartitionPath = new
ObjectPath(TEST_DEFAULT_DATABASE, "tb_mksp" + System.currentTimeMillis());
+ final ResolvedCatalogTable multipleKeySinglePartitionTable = new
ResolvedCatalogTable(
+ CatalogTable.of(
+
Schema.newBuilder().fromResolvedSchema(CREATE_MULTI_KEY_TABLE_SCHEMA).build(),
+ "test",
+ Lists.newArrayList("par1"),
+ EXPECTED_OPTIONS),
+ CREATE_TABLE_SCHEMA
+ );
+
+ catalog.createTable(multipleKeySinglePartitionPath,
multipleKeySinglePartitionTable, false);
+ metaClient =
+ StreamerUtil.createMetaClient(catalog.inferTablePath(catalogPathStr,
singleKeyMultiplePartitionPath), new org.apache.hadoop.conf.Configuration());
+ keyGeneratorClassName =
metaClient.getTableConfig().getKeyGeneratorClassName();
+ assertThat(keyGeneratorClassName,
is(ComplexAvroKeyGenerator.class.getName()));
+
// validate key generator for non partitioned table
ObjectPath nonPartitionPath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb");
final ResolvedCatalogTable nonPartitionCatalogTable = new
ResolvedCatalogTable(
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 45fc3d6f386..d88bb0326ef 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -29,11 +29,13 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.util.StreamerUtil;
+import org.apache.flink.calcite.shaded.com.google.common.collect.Lists;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableSchema;
@@ -71,6 +73,7 @@ import java.util.stream.Collectors;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
import static org.apache.hudi.configuration.FlinkOptions.PRECOMBINE_FIELD;
+import static
org.apache.hudi.keygen.constant.KeyGeneratorOptions.RECORDKEY_FIELD_NAME;
import static
org.apache.hudi.table.catalog.HoodieCatalogTestUtils.createHiveConf;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@@ -97,6 +100,26 @@ public class TestHoodieHiveCatalog {
.primaryKey("uuid")
.build();
List<String> partitions = Collections.singletonList("par1");
+
+ TableSchema multiKeySinglePartitionTableSchema =
+ TableSchema.builder()
+ .field("uuid", DataTypes.INT().notNull())
+ .field("name", DataTypes.STRING().notNull())
+ .field("age", DataTypes.INT())
+ .field("par1", DataTypes.STRING())
+ .primaryKey("uuid", "name")
+ .build();
+
+ TableSchema singleKeyMultiPartitionTableSchema =
+ TableSchema.builder()
+ .field("uuid", DataTypes.INT().notNull())
+ .field("name", DataTypes.STRING())
+ .field("par1", DataTypes.STRING())
+ .field("par2", DataTypes.STRING())
+ .primaryKey("uuid")
+ .build();
+ List<String> multiPartitions = Lists.newArrayList("par1", "par2");
+
private static HoodieHiveCatalog hoodieCatalog;
private final ObjectPath tablePath = new ObjectPath("default", "test");
@@ -201,6 +224,28 @@ public class TestHoodieHiveCatalog {
String keyGeneratorClassName =
metaClient.getTableConfig().getKeyGeneratorClassName();
assertEquals(keyGeneratorClassName,
SimpleAvroKeyGenerator.class.getName());
+ // validate single key and multiple partition for partitioned table
+ ObjectPath singleKeyMultiPartitionPath = new ObjectPath("default",
"tb_skmp_" + System.currentTimeMillis());
+ CatalogTable singleKeyMultiPartitionTable =
+ new CatalogTableImpl(singleKeyMultiPartitionTableSchema,
multiPartitions, options, "hudi table");
+ hoodieCatalog.createTable(singleKeyMultiPartitionPath,
singleKeyMultiPartitionTable, false);
+
+ HoodieTableMetaClient singleKeyMultiPartitionTableMetaClient =
+
StreamerUtil.createMetaClient(hoodieCatalog.inferTablePath(singleKeyMultiPartitionPath,
singleKeyMultiPartitionTable), createHiveConf());
+
assertThat(singleKeyMultiPartitionTableMetaClient.getTableConfig().getKeyGeneratorClassName(),
is(ComplexAvroKeyGenerator.class.getName()));
+
+ // validate multiple key and single partition for partitioned table
+ ObjectPath multiKeySinglePartitionPath = new ObjectPath("default",
"tb_mksp_" + System.currentTimeMillis());
+
+ options.remove(RECORDKEY_FIELD_NAME.key());
+ CatalogTable multiKeySinglePartitionTable =
+ new CatalogTableImpl(multiKeySinglePartitionTableSchema, partitions,
options, "hudi table");
+ hoodieCatalog.createTable(multiKeySinglePartitionPath,
multiKeySinglePartitionTable, false);
+
+ HoodieTableMetaClient multiKeySinglePartitionTableMetaClient =
+
StreamerUtil.createMetaClient(hoodieCatalog.inferTablePath(multiKeySinglePartitionPath,
multiKeySinglePartitionTable), createHiveConf());
+
assertThat(multiKeySinglePartitionTableMetaClient.getTableConfig().getKeyGeneratorClassName(),
is(ComplexAvroKeyGenerator.class.getName()));
+
// validate key generator for non partitioned table
ObjectPath nonPartitionPath = new ObjectPath("default", "tb_" + tableType);
CatalogTable nonPartitionTable =