This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 48b1737b30d [HUDI-7136] In the dfs catalog scenario, solve the problem
of Primary key definition is missing (#10162)
48b1737b30d is described below
commit 48b1737b30d1c5f9f82306136b13436f3c27ca5b
Author: empcl <[email protected]>
AuthorDate: Thu Dec 7 12:33:00 2023 +0800
[HUDI-7136] In the dfs catalog scenario, solve the problem of Primary key
definition is missing (#10162)
Co-authored-by: chenlei677 <[email protected]>
---
.../org/apache/hudi/table/catalog/HoodieCatalog.java | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index eb083c1eb1e..e3e686dbb27 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -89,6 +89,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.hudi.configuration.FlinkOptions.RECORD_KEY_FIELD;
import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
@@ -312,7 +313,7 @@ public class HoodieCatalog extends AbstractCatalog {
Configuration conf = Configuration.fromMap(options);
conf.setString(FlinkOptions.PATH, tablePathStr);
ResolvedSchema resolvedSchema = resolvedTable.getResolvedSchema();
- if (!resolvedSchema.getPrimaryKey().isPresent()) {
+ if (!resolvedSchema.getPrimaryKey().isPresent() &&
!conf.containsKey(RECORD_KEY_FIELD.key())) {
throw new CatalogException("Primary key definition is missing");
}
final String avroSchema = AvroSchemaConverter.convertToSchema(
@@ -326,10 +327,19 @@ public class HoodieCatalog extends AbstractCatalog {
// because the HoodieTableMetaClient is a heavy impl, we try to avoid
initializing it
// when calling #getTable.
- final String pkColumns = String.join(",",
resolvedSchema.getPrimaryKey().get().getColumns());
- conf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
- options.put(TableOptionProperties.PK_CONSTRAINT_NAME,
resolvedSchema.getPrimaryKey().get().getName());
- options.put(TableOptionProperties.PK_COLUMNS, pkColumns);
+ //set pk
+ if (resolvedSchema.getPrimaryKey().isPresent()
+ && !conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
+ final String pkColumns = String.join(",",
resolvedSchema.getPrimaryKey().get().getColumns());
+ conf.setString(RECORD_KEY_FIELD, pkColumns);
+ }
+
+ if (resolvedSchema.getPrimaryKey().isPresent()) {
+ options.put(TableOptionProperties.PK_CONSTRAINT_NAME,
resolvedSchema.getPrimaryKey().get().getName());
+ }
+ if (conf.containsKey(RECORD_KEY_FIELD.key())) {
+ options.put(TableOptionProperties.PK_COLUMNS,
conf.getString(RECORD_KEY_FIELD));
+ }
// check preCombine
final String preCombineField =
conf.getString(FlinkOptions.PRECOMBINE_FIELD);