This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 0f9f4ca897e [HUDI-7189] Fix Flink catalog keygen class of table
properties for non partitioned table (#10227)
0f9f4ca897e is described below
commit 0f9f4ca897e34e947b616a52804b90eadd6e2857
Author: empcl <[email protected]>
AuthorDate: Thu Dec 7 11:51:04 2023 +0800
[HUDI-7189] Fix Flink catalog keygen class of table properties for non
partitioned table (#10227)
---
.../apache/hudi/table/catalog/HoodieCatalog.java | 3 +++
.../hudi/table/catalog/HoodieHiveCatalog.java | 5 ++++
.../hudi/table/catalog/TestHoodieCatalog.java | 27 ++++++++++++++++++++++
.../hudi/table/catalog/TestHoodieHiveCatalog.java | 20 ++++++++++++++++
4 files changed, 55 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 00cea3c3df5..eb083c1eb1e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -32,6 +32,7 @@ import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.FlinkWriteClients;
@@ -349,6 +350,8 @@ public class HoodieCatalog extends AbstractCatalog {
final String partitions = String.join(",",
resolvedTable.getPartitionKeys());
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
+ } else {
+ conf.setString(FlinkOptions.KEYGEN_CLASS_NAME.key(),
NonpartitionedAvroKeyGenerator.class.getName());
}
conf.setString(FlinkOptions.TABLE_NAME, tablePath.getObjectName());
try {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 9fb42451e02..12fcf4a23b0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -35,6 +35,7 @@ import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.table.HoodieTableFactory;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.AvroSchemaConverter;
@@ -505,6 +506,10 @@ public class HoodieHiveCatalog extends AbstractCatalog {
flinkConf.setString(FlinkOptions.PARTITION_PATH_FIELD, partitions);
}
+ if (!catalogTable.isPartitioned()) {
+ flinkConf.setString(FlinkOptions.KEYGEN_CLASS_NAME.key(),
NonpartitionedAvroKeyGenerator.class.getName());
+ }
+
if (!flinkConf.getOptional(PATH).isPresent()) {
flinkConf.setString(PATH, inferTablePath(tablePath, catalogTable));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index dc4e0db058a..0207022903b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -66,6 +68,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.io.File;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -248,6 +251,30 @@ public class TestHoodieCatalog {
// test create exist table
assertThrows(TableAlreadyExistException.class,
() -> catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, false));
+
+ // validate key generator for partitioned table
+ HoodieTableMetaClient metaClient =
+ StreamerUtil.createMetaClient(catalog.inferTablePath(catalogPathStr,
tablePath), new org.apache.hadoop.conf.Configuration());
+ String keyGeneratorClassName =
metaClient.getTableConfig().getKeyGeneratorClassName();
+ assertEquals(keyGeneratorClassName,
SimpleAvroKeyGenerator.class.getName());
+
+ // validate key generator for non partitioned table
+ ObjectPath nonPartitionPath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb");
+ final ResolvedCatalogTable nonPartitionCatalogTable = new
ResolvedCatalogTable(
+ CatalogTable.of(
+
Schema.newBuilder().fromResolvedSchema(CREATE_TABLE_SCHEMA).build(),
+ "test",
+ new ArrayList<>(),
+ EXPECTED_OPTIONS),
+ CREATE_TABLE_SCHEMA
+ );
+
+ catalog.createTable(nonPartitionPath, nonPartitionCatalogTable, false);
+
+ metaClient =
+ StreamerUtil.createMetaClient(catalog.inferTablePath(catalogPathStr,
nonPartitionPath), new org.apache.hadoop.conf.Configuration());
+ keyGeneratorClassName =
metaClient.getTableConfig().getKeyGeneratorClassName();
+ assertEquals(keyGeneratorClassName,
NonpartitionedAvroKeyGenerator.class.getName());
}
@Test
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index 9eed5e8a5d6..f0e3276026b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieCatalogException;
+import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
+import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
import org.apache.hudi.sink.partitioner.profile.WriteProfiles;
import org.apache.hudi.util.StreamerUtil;
@@ -59,6 +61,7 @@ import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -66,6 +69,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+import static
org.apache.hudi.table.catalog.HoodieCatalogTestUtils.createHiveConf;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -187,6 +191,22 @@ public class TestHoodieHiveCatalog {
CatalogBaseTable table2 = hoodieCatalog.getTable(tablePath);
assertEquals("id",
table2.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
+
+ // validate key generator for partitioned table
+ HoodieTableMetaClient metaClient =
+ StreamerUtil.createMetaClient(hoodieCatalog.inferTablePath(tablePath,
table), createHiveConf());
+ String keyGeneratorClassName =
metaClient.getTableConfig().getKeyGeneratorClassName();
+ assertEquals(keyGeneratorClassName,
SimpleAvroKeyGenerator.class.getName());
+
+ // validate key generator for non partitioned table
+ ObjectPath nonPartitionPath = new ObjectPath("default", "tb_" + tableType);
+ CatalogTable nonPartitionTable =
+ new CatalogTableImpl(schema, new ArrayList<>(), options, "hudi table");
+ hoodieCatalog.createTable(nonPartitionPath, nonPartitionTable, false);
+
+ metaClient =
StreamerUtil.createMetaClient(hoodieCatalog.inferTablePath(nonPartitionPath,
nonPartitionTable), createHiveConf());
+ keyGeneratorClassName =
metaClient.getTableConfig().getKeyGeneratorClassName();
+ assertEquals(keyGeneratorClassName,
NonpartitionedAvroKeyGenerator.class.getName());
}
@Test