This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 32ecf248e5f [HUDI-7675] Don't set default value for primary key when 
get schema from hms (#11101)
32ecf248e5f is described below

commit 32ecf248e5fe93ef3d3ce5027c58550eb2966827
Author: hehuiyuan <[email protected]>
AuthorDate: Sat Apr 27 08:07:28 2024 +0800

    [HUDI-7675] Don't set default value for primary key when get schema from 
hms (#11101)
---
 .../main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java    | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index fac507cb7db..fcdd03b6aba 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -68,8 +68,8 @@ public class HiveSchemaUtils {
     allCols.addAll(hiveTable.getPartitionKeys());
 
     String pkConstraintName = 
hiveTable.getParameters().get(TableOptionProperties.PK_CONSTRAINT_NAME);
-    String pkColumnStr = 
hiveTable.getParameters().getOrDefault(FlinkOptions.RECORD_KEY_FIELD.key(), 
FlinkOptions.RECORD_KEY_FIELD.defaultValue());
-    List<String> pkColumns = StringUtils.split(pkColumnStr, ",");
+    String pkColumnStr = 
hiveTable.getParameters().get(FlinkOptions.RECORD_KEY_FIELD.key());
+    List<String> pkColumns = pkColumnStr == null ? new ArrayList<>() : 
StringUtils.split(pkColumnStr, ",");
 
     String[] colNames = new String[allCols.size()];
     DataType[] colTypes = new DataType[allCols.size()];
@@ -88,7 +88,7 @@ public class HiveSchemaUtils {
     org.apache.flink.table.api.Schema.Builder builder = 
org.apache.flink.table.api.Schema.newBuilder().fromFields(colNames, colTypes);
     if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
       builder.primaryKeyNamed(pkConstraintName, pkColumns);
-    } else {
+    } else if (!pkColumns.isEmpty()) {
       builder.primaryKey(pkColumns);
     }
 

Reply via email to