This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 48b1737b30d [HUDI-7136] In the dfs catalog scenario, solve the problem 
of Primary key definition is missing (#10162)
48b1737b30d is described below

commit 48b1737b30d1c5f9f82306136b13436f3c27ca5b
Author: empcl <[email protected]>
AuthorDate: Thu Dec 7 12:33:00 2023 +0800

    [HUDI-7136] In the dfs catalog scenario, solve the problem of Primary key 
definition is missing (#10162)
    
    Co-authored-by: chenlei677 <[email protected]>
---
 .../org/apache/hudi/table/catalog/HoodieCatalog.java | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index eb083c1eb1e..e3e686dbb27 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -89,6 +89,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.hudi.configuration.FlinkOptions.RECORD_KEY_FIELD;
 import static org.apache.hudi.table.catalog.CatalogOptions.CATALOG_PATH;
 import static org.apache.hudi.table.catalog.CatalogOptions.DEFAULT_DATABASE;
 
@@ -312,7 +313,7 @@ public class HoodieCatalog extends AbstractCatalog {
     Configuration conf = Configuration.fromMap(options);
     conf.setString(FlinkOptions.PATH, tablePathStr);
     ResolvedSchema resolvedSchema = resolvedTable.getResolvedSchema();
-    if (!resolvedSchema.getPrimaryKey().isPresent()) {
+    if (!resolvedSchema.getPrimaryKey().isPresent() && 
!conf.containsKey(RECORD_KEY_FIELD.key())) {
       throw new CatalogException("Primary key definition is missing");
     }
     final String avroSchema = AvroSchemaConverter.convertToSchema(
@@ -326,10 +327,19 @@ public class HoodieCatalog extends AbstractCatalog {
     // because the HoodieTableMetaClient is a heavy impl, we try to avoid 
initializing it
     // when calling #getTable.
 
-    final String pkColumns = String.join(",", 
resolvedSchema.getPrimaryKey().get().getColumns());
-    conf.setString(FlinkOptions.RECORD_KEY_FIELD, pkColumns);
-    options.put(TableOptionProperties.PK_CONSTRAINT_NAME, 
resolvedSchema.getPrimaryKey().get().getName());
-    options.put(TableOptionProperties.PK_COLUMNS, pkColumns);
+    //set pk
+    if (resolvedSchema.getPrimaryKey().isPresent()
+            && !conf.containsKey(FlinkOptions.RECORD_KEY_FIELD.key())) {
+      final String pkColumns = String.join(",", 
resolvedSchema.getPrimaryKey().get().getColumns());
+      conf.setString(RECORD_KEY_FIELD, pkColumns);
+    }
+
+    if (resolvedSchema.getPrimaryKey().isPresent()) {
+      options.put(TableOptionProperties.PK_CONSTRAINT_NAME, 
resolvedSchema.getPrimaryKey().get().getName());
+    }
+    if (conf.containsKey(RECORD_KEY_FIELD.key())) {
+      options.put(TableOptionProperties.PK_COLUMNS, 
conf.getString(RECORD_KEY_FIELD));
+    }
 
     // check preCombine
     final String preCombineField = 
conf.getString(FlinkOptions.PRECOMBINE_FIELD);

Reply via email to