This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 20e8ec79c8 [HUDI-5058] Fix flink catalog read spark table error :
primary key col can not be nullable (#7009)
20e8ec79c8 is described below
commit 20e8ec79c82ebda1a296242332251a6c1d55c400
Author: chao chen <[email protected]>
AuthorDate: Wed Oct 26 20:09:58 2022 +0800
[HUDI-5058] Fix flink catalog read spark table error : primary key col can
not be nullable (#7009)
---
.../hudi/table/catalog/HoodieHiveCatalog.java | 15 ++++++--
.../java/org/apache/hudi/util/DataTypeUtils.java | 44 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index c0cd386793..d6e70f16ea 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -34,6 +34,7 @@ import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
import org.apache.hudi.sync.common.util.ConfigUtils;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.StreamerUtil;
import org.apache.avro.Schema;
@@ -70,6 +71,7 @@ import
org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.types.DataType;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
@@ -402,17 +404,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
String path = hiveTable.getSd().getLocation();
Map<String, String> parameters = hiveTable.getParameters();
Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path,
hiveConf);
+ String pkColumnsStr = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
+ List<String> pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr)
+ ? null : StringUtils.split(pkColumnsStr, ",");
org.apache.flink.table.api.Schema schema;
if (latestTableSchema != null) {
+ // if the table is initialized from spark, the write schema is nullable
for pk columns.
+ DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
+ AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns);
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder()
-
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestTableSchema));
+ .fromRowDataType(tableDataType);
String pkConstraintName = parameters.get(PK_CONSTRAINT_NAME);
- String pkColumns = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
// pkColumns expect not to be null
- builder.primaryKeyNamed(pkConstraintName, StringUtils.split(pkColumns,
","));
+ builder.primaryKeyNamed(pkConstraintName, pkColumns);
} else if (pkColumns != null) {
- builder.primaryKey(StringUtils.split(pkColumns, ","));
+ builder.primaryKey(pkColumns);
}
schema = builder.build();
} else {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
index e91432b5e3..c772dc8539 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/DataTypeUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.util;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -26,10 +27,14 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
+import javax.annotation.Nullable;
+
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
/**
* Utilities for {@link org.apache.flink.table.types.DataType}.
@@ -123,4 +128,43 @@ public class DataTypeUtils {
"Can not convert %s to type %s for partition value",
partition, type));
}
}
+
+ /**
+ * Ensures the give columns of the row data type are not nullable(for
example, the primary keys).
+ *
+ * @param dataType The row data type, datatype logicaltype must be rowtype
+ * @param pkColumns The primary keys
+ * @return a new row data type if any column nullability is tweaked or the
original data type
+ */
+ public static DataType ensureColumnsAsNonNullable(DataType dataType,
@Nullable List<String> pkColumns) {
+ if (pkColumns == null || pkColumns.isEmpty()) {
+ return dataType;
+ }
+ LogicalType dataTypeLogicalType = dataType.getLogicalType();
+ if (!(dataTypeLogicalType instanceof RowType)) {
+ throw new RuntimeException("The datatype to be converted must be row
type, but this type is :" + dataTypeLogicalType.getClass());
+ }
+ RowType rowType = (RowType) dataTypeLogicalType;
+ List<DataType> originalFieldTypes = dataType.getChildren();
+ List<String> fieldNames = rowType.getFieldNames();
+ List<DataType> fieldTypes = new ArrayList<>();
+ boolean tweaked = false;
+ for (int i = 0; i < fieldNames.size(); i++) {
+ if (pkColumns.contains(fieldNames.get(i)) &&
rowType.getTypeAt(i).isNullable()) {
+ fieldTypes.add(originalFieldTypes.get(i).notNull());
+ tweaked = true;
+ } else {
+ fieldTypes.add(originalFieldTypes.get(i));
+ }
+ }
+ if (!tweaked) {
+ return dataType;
+ }
+ List<DataTypes.Field> fields = new ArrayList<>();
+ for (int i = 0; i < fieldNames.size(); i++) {
+ fields.add(DataTypes.FIELD(fieldNames.get(i), fieldTypes.get(i)));
+ }
+ return
DataTypes.ROW(fields.stream().toArray(DataTypes.Field[]::new)).notNull();
+ }
+
}