This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d3840a0c02f [HUDI-5058] HoodieCatalog#getTable sets primary key with 
hoodie.datasource.write.recordkey.field for table intialized via Spark (#7981)
d3840a0c02f is described below

commit d3840a0c02f97d8238fdc00946b8101cbf5217f9
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Feb 20 10:32:45 2023 +0800

    [HUDI-5058] HoodieCatalog#getTable sets primary key with 
hoodie.datasource.write.recordkey.field for table intialized via Spark (#7981)
---
 .../apache/hudi/table/catalog/HoodieCatalog.java    | 21 +++++++++++++++------
 .../hudi/table/catalog/HoodieHiveCatalog.java       |  6 +++---
 .../hudi/table/catalog/TestHoodieCatalog.java       |  7 ++++++-
 3 files changed, 24 insertions(+), 10 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 15637f575b5..020a0e20f8a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -24,6 +24,8 @@ import 
org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
@@ -31,6 +33,7 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -66,8 +69,8 @@ import 
org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.StringUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -158,7 +161,7 @@ public class HoodieCatalog extends AbstractCatalog {
 
   @Override
   public boolean databaseExists(String databaseName) throws CatalogException {
-    checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+    checkArgument(!StringUtils.isNullOrEmpty(databaseName));
 
     return listDatabases().contains(databaseName);
   }
@@ -250,11 +253,17 @@ public class HoodieCatalog extends AbstractCatalog {
     Map<String, String> options = 
TableOptionProperties.loadFromProperties(path, hadoopConf);
     final Schema latestSchema = getLatestTableSchema(path);
     if (latestSchema != null) {
+      List<String> pkColumns = TableOptionProperties.getPkColumns(options);
+      // if the table is initialized from spark, the write schema is nullable 
for pk columns.
+      DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
+          AvroSchemaConverter.convertToDataType(latestSchema), pkColumns);
       org.apache.flink.table.api.Schema.Builder builder = 
org.apache.flink.table.api.Schema.newBuilder()
-          
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestSchema));
+          .fromRowDataType(tableDataType);
       final String pkConstraintName = 
TableOptionProperties.getPkConstraintName(options);
-      if (pkConstraintName != null) {
-        builder.primaryKeyNamed(pkConstraintName, 
TableOptionProperties.getPkColumns(options));
+      if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
+        builder.primaryKeyNamed(pkConstraintName, pkColumns);
+      } else if (!CollectionUtils.isNullOrEmpty(pkColumns)) {
+        builder.primaryKey(pkColumns);
       }
       final org.apache.flink.table.api.Schema schema = builder.build();
       return CatalogTable.of(
@@ -333,7 +342,7 @@ public class HoodieCatalog extends AbstractCatalog {
     try {
       StreamerUtil.initTableIfNotExists(conf);
       // prepare the non-table-options properties
-      if (!StringUtils.isNullOrWhitespaceOnly(resolvedTable.getComment())) {
+      if (!StringUtils.isNullOrEmpty(resolvedTable.getComment())) {
         options.put(TableOptionProperties.COMMENT, resolvedTable.getComment());
       }
       TableOptionProperties.createProperties(tablePathStr, hadoopConf, 
options);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index fd36a39d237..50d7ea31152 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -417,11 +417,11 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     String path = hiveTable.getSd().getLocation();
     Map<String, String> parameters = hiveTable.getParameters();
     Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, 
hiveConf);
-    String pkColumnsStr = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
-    List<String> pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr)
-        ? null : StringUtils.split(pkColumnsStr, ",");
     org.apache.flink.table.api.Schema schema;
     if (latestTableSchema != null) {
+      String pkColumnsStr = 
parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
+      List<String> pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr)
+          ? null : StringUtils.split(pkColumnsStr, ",");
       // if the table is initialized from spark, the write schema is nullable 
for pk columns.
       DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
           AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 12b766ba18d..4bafaae952b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -56,6 +56,7 @@ import 
org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -111,7 +112,11 @@ public class TestHoodieCatalog {
                     .getLogicalType()
                     .getTypeRoot()
                     .equals(LogicalTypeRoot.VARCHAR)) {
-                  return Column.physical(col.getName(), DataTypes.STRING());
+                  DataType dataType = DataTypes.STRING();
+                  if ("uuid".equals(col.getName())) {
+                    dataType = dataType.notNull();
+                  }
+                  return Column.physical(col.getName(), dataType);
                 } else {
                   return col;
                 }

Reply via email to