This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d3840a0c02f [HUDI-5058] HoodieCatalog#getTable sets primary key with
hoodie.datasource.write.recordkey.field for table intialized via Spark (#7981)
d3840a0c02f is described below
commit d3840a0c02f97d8238fdc00946b8101cbf5217f9
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Feb 20 10:32:45 2023 +0800
[HUDI-5058] HoodieCatalog#getTable sets primary key with
hoodie.datasource.write.recordkey.field for table intialized via Spark (#7981)
---
.../apache/hudi/table/catalog/HoodieCatalog.java | 21 +++++++++++++++------
.../hudi/table/catalog/HoodieHiveCatalog.java | 6 +++---
.../hudi/table/catalog/TestHoodieCatalog.java | 7 ++++++-
3 files changed, 24 insertions(+), 10 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 15637f575b5..020a0e20f8a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -24,6 +24,8 @@ import
org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
@@ -31,6 +33,7 @@ import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.DataTypeUtils;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.util.StreamerUtil;
@@ -66,8 +69,8 @@ import
org.apache.flink.table.catalog.exceptions.TablePartitionedException;
import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.util.CollectionUtil;
-import org.apache.flink.util.StringUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -158,7 +161,7 @@ public class HoodieCatalog extends AbstractCatalog {
@Override
public boolean databaseExists(String databaseName) throws CatalogException {
- checkArgument(!StringUtils.isNullOrWhitespaceOnly(databaseName));
+ checkArgument(!StringUtils.isNullOrEmpty(databaseName));
return listDatabases().contains(databaseName);
}
@@ -250,11 +253,17 @@ public class HoodieCatalog extends AbstractCatalog {
Map<String, String> options =
TableOptionProperties.loadFromProperties(path, hadoopConf);
final Schema latestSchema = getLatestTableSchema(path);
if (latestSchema != null) {
+ List<String> pkColumns = TableOptionProperties.getPkColumns(options);
+ // if the table is initialized from spark, the write schema is nullable
for pk columns.
+ DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
+ AvroSchemaConverter.convertToDataType(latestSchema), pkColumns);
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder()
-
.fromRowDataType(AvroSchemaConverter.convertToDataType(latestSchema));
+ .fromRowDataType(tableDataType);
final String pkConstraintName =
TableOptionProperties.getPkConstraintName(options);
- if (pkConstraintName != null) {
- builder.primaryKeyNamed(pkConstraintName,
TableOptionProperties.getPkColumns(options));
+ if (!StringUtils.isNullOrEmpty(pkConstraintName)) {
+ builder.primaryKeyNamed(pkConstraintName, pkColumns);
+ } else if (!CollectionUtils.isNullOrEmpty(pkColumns)) {
+ builder.primaryKey(pkColumns);
}
final org.apache.flink.table.api.Schema schema = builder.build();
return CatalogTable.of(
@@ -333,7 +342,7 @@ public class HoodieCatalog extends AbstractCatalog {
try {
StreamerUtil.initTableIfNotExists(conf);
// prepare the non-table-options properties
- if (!StringUtils.isNullOrWhitespaceOnly(resolvedTable.getComment())) {
+ if (!StringUtils.isNullOrEmpty(resolvedTable.getComment())) {
options.put(TableOptionProperties.COMMENT, resolvedTable.getComment());
}
TableOptionProperties.createProperties(tablePathStr, hadoopConf,
options);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index fd36a39d237..50d7ea31152 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -417,11 +417,11 @@ public class HoodieHiveCatalog extends AbstractCatalog {
String path = hiveTable.getSd().getLocation();
Map<String, String> parameters = hiveTable.getParameters();
Schema latestTableSchema = StreamerUtil.getLatestTableSchema(path,
hiveConf);
- String pkColumnsStr = parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
- List<String> pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr)
- ? null : StringUtils.split(pkColumnsStr, ",");
org.apache.flink.table.api.Schema schema;
if (latestTableSchema != null) {
+ String pkColumnsStr =
parameters.get(FlinkOptions.RECORD_KEY_FIELD.key());
+ List<String> pkColumns = StringUtils.isNullOrEmpty(pkColumnsStr)
+ ? null : StringUtils.split(pkColumnsStr, ",");
// if the table is initialized from spark, the write schema is nullable
for pk columns.
DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
AvroSchemaConverter.convertToDataType(latestTableSchema), pkColumns);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index 12b766ba18d..4bafaae952b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -56,6 +56,7 @@ import
org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -111,7 +112,11 @@ public class TestHoodieCatalog {
.getLogicalType()
.getTypeRoot()
.equals(LogicalTypeRoot.VARCHAR)) {
- return Column.physical(col.getName(), DataTypes.STRING());
+ DataType dataType = DataTypes.STRING();
+ if ("uuid".equals(col.getName())) {
+ dataType = dataType.notNull();
+ }
+ return Column.physical(col.getName(), dataType);
} else {
return col;
}