alexeykudinkin commented on code in PR #7009:
URL: https://github.com/apache/hudi/pull/7009#discussion_r1003684191


##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java:
##########
@@ -397,17 +399,22 @@ public CatalogBaseTable getTable(ObjectPath tablePath) 
throws TableNotExistExcep
     String path = hiveTable.getSd().getLocation();
     Map<String, String> parameters = hiveTable.getParameters();
     Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path, 
hiveConf);
+    String pkColumnsStr = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
+    List<String> pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr)
+        ? null : StringUtils.split(pkColumnsStr, ",");
     org.apache.flink.table.api.Schema schema;
     if (latestTableSchema != null) {
+      // if the table is initialized from spark, the write schema is nullable 
for pk columns.
+      DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(

Review Comment:
   Yeah, i don't think this in Spark DS we enforce invariant that primary-keys 
has to be non-null



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java:
##########
@@ -123,4 +128,39 @@ public static Object resolvePartition(String partition, 
DataType type) {
                 "Can not convert %s to type %s for partition value", 
partition, type));
     }
   }
+
+  /**
+   * Ensures the give columns of the row data type are not nullable(for 
example, the primary keys).
+   *
+   * @param dataType  The row data type
+   * @param pkColumns The primary keys
+   * @return a new row data type if any column nullability is tweaked or the 
original data type
+   */
+  public static DataType ensureColumnsAsNonNullable(DataType dataType, 
@Nullable List<String> pkColumns) {
+    if (pkColumns == null || pkColumns.isEmpty()) {
+      return dataType;
+    }
+    RowType rowType = (RowType) dataType.getLogicalType();
+    List<DataType> oriFieldTypes = dataType.getChildren();

Review Comment:
   nit: `originalFieldTypes`



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java:
##########
@@ -123,4 +128,39 @@ public static Object resolvePartition(String partition, 
DataType type) {
                 "Can not convert %s to type %s for partition value", 
partition, type));
     }
   }
+
+  /**
+   * Ensures the give columns of the row data type are not nullable(for 
example, the primary keys).
+   *
+   * @param dataType  The row data type
+   * @param pkColumns The primary keys
+   * @return a new row data type if any column nullability is tweaked or the 
original data type
+   */
+  public static DataType ensureColumnsAsNonNullable(DataType dataType, 
@Nullable List<String> pkColumns) {
+    if (pkColumns == null || pkColumns.isEmpty()) {
+      return dataType;
+    }
+    RowType rowType = (RowType) dataType.getLogicalType();
+    List<DataType> oriFieldTypes = dataType.getChildren();
+    List<String> fieldNames = rowType.getFieldNames();
+    List<DataType> fieldTypes = new ArrayList<>();
+    boolean tweaked = false;
+    for (int i = 0; i < fieldNames.size(); i++) {
+      if (pkColumns.contains(fieldNames.get(i)) && 
rowType.getTypeAt(i).isNullable()) {
+        fieldTypes.add(oriFieldTypes.get(i).notNull());
+        tweaked = true;
+      } else {
+        fieldTypes.add(oriFieldTypes.get(i));
+      }
+    }
+    if (!tweaked) {
+      return dataType;
+    }
+    List<DataTypes.Field> fields = new ArrayList<>();
+    for (int i = 0; i < fieldNames.size(); i++) {
+      fields.add(DataTypes.FIELD(fieldNames.get(i), fieldTypes.get(i)));
+    }
+    return DataTypes.ROW(fields.toArray(new 
DataTypes.Field[fields.size()])).notNull();

Review Comment:
   Better to do: `fields.toArray(DataTypes:Field[]::new)`



##########
hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java:
##########
@@ -123,4 +128,39 @@ public static Object resolvePartition(String partition, 
DataType type) {
                 "Can not convert %s to type %s for partition value", 
partition, type));
     }
   }
+
+  /**
+   * Ensures the give columns of the row data type are not nullable(for 
example, the primary keys).
+   *
+   * @param dataType  The row data type
+   * @param pkColumns The primary keys
+   * @return a new row data type if any column nullability is tweaked or the 
original data type
+   */
+  public static DataType ensureColumnsAsNonNullable(DataType dataType, 
@Nullable List<String> pkColumns) {

Review Comment:
   nit: this method should be generic no reason to couple it to primary-keys 
(we can call it for non-PK use-cases as well)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to