This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 20e8ec79c8 [HUDI-5058] Fix flink catalog read spark table error : 
primary key col can not be nullable (#7009)
20e8ec79c8 is described below

commit 20e8ec79c82ebda1a296242332251a6c1d55c400
Author: chao chen <[email protected]>
AuthorDate: Wed Oct 26 20:09:58 2022 +0800

    [HUDI-5058] Fix flink catalog read spark table error : primary key col can 
not be nullable (#7009)
---
 .../hudi/table/catalog/HoodieHiveCatalog.java      | 15 ++++++--
 .../java/org/apache/hudi/util/DataTypeUtils.java   | 44 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index c0cd386793..d6e70f16ea 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -34,6 +34,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.sync.common.util.ConfigUtils;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.avro.Schema;
@@ -70,6 +71,7 @@ import 
org.apache.flink.table.catalog.exceptions.TablePartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.types.DataType;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -402,17 +404,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     String path = hiveTable.getSd().getLocation();
     Map<String, String> parameters = hiveTable.getParameters();
     Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, 
hiveConf);
+    String pkColumnsStr = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
+    List<String> pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr)
+        ? null : StringUtils.split(pkColumnsStr, ",");
     org.apache.flink.table.api.Schema schema;
     if (latestTableSchema != null) {
+      // if the table is initialized from spark, the write schema is nullable 
for pk columns.
+      DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
+          AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns);
       org.apache.flink.table.api.Schema.Builder builder = 
org.apache.flink.table.api.Schema.newBuilder()
-          
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
+          .fromRowDataType(tableDataType);
       String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
-      String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
       if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
         // pkColumns expect not to be null
-        builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns, 
","));
+        builder.primaryKeyNamed(pkConstraintName, pkColumns);
       } else if (pkColumns != null) {
-        builder.primaryKey(StringUtils.split(pkColumns, ","));
+        builder.primaryKey(pkColumns);
       }
       schema = builder.build();
     } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
index e91432b5e3..c772dc8539 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.util;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -26,10 +27,14 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
 
+import javax.annotation.Nullable;
+
 import java.math.BigDecimal;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 /**
  * Utilities for {@link org.apache.flink.table.types.DataType}.
@@ -123,4 +128,43 @@ public class DataTypeUtils {
                 "Can not convert %s to type %s for partition value", 
partition, type));
     }
   }
+
+  /**
+   * Ensures the give columns of the row data type are not nullable(for 
example, the primary keys).
+   *
+   * @param dataType  The row data type, datatype logicaltype must be rowtype
+   * @param pkColumns The primary keys
+   * @return a new row data type if any column nullability is tweaked or the 
original data type
+   */
+  public static DataType ensureColumnsAsNonNullable(DataType dataType, 
@Nullable List<String> pkColumns) {
+    if (pkColumns == null || pkColumns.isEmpty()) {
+      return dataType;
+    }
+    LogicalType dataTypeLogicalType = dataType.getLogicalType();
+    if (!(dataTypeLogicalType instanceof RowType)) {
+      throw new RuntimeException("The datatype to be converted must be row 
type, but this type is :" + dataTypeLogicalType.getClass());
+    }
+    RowType rowType = (RowType) dataTypeLogicalType;
+    List<DataType> originalFieldTypes = dataType.getChildren();
+    List<String> fieldNames = rowType.getFieldNames();
+    List<DataType> fieldTypes = new ArrayList<>();
+    boolean tweaked = false;
+    for (int i = 0; i < fieldNames.size(); i++) {
+      if (pkColumns.contains(fieldNames.get(i)) && 
rowType.getTypeAt(i).isNullable()) {
+        fieldTypes.add(originalFieldTypes.get(i).notNull());
+        tweaked = true;
+      } else {
+        fieldTypes.add(originalFieldTypes.get(i));
+      }
+    }
+    if (!tweaked) {
+      return dataType;
+    }
+    List<DataTypes.Field> fields = new ArrayList<>();
+    for (int i = 0; i < fieldNames.size(); i++) {
+      fields.add(DataTypes.FIELD(fieldNames.get(i), fieldTypes.get(i)));
+    }
+    return 
DataTypes.ROW(fields.stream().toArray(DataTypes.Field[]::new)).notNull();
+  }
+
 }

Reply via email to