This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.13.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 22b634b081571f01d2d9de4e94ce240cecc46d7d Author: chao chen <[email protected]> AuthorDate: Wed Feb 1 09:45:39 2023 +0800 [HUDI-5585][flink] Fix flink creates and writes the table, the spark alter table reports an error (#7706) Co-authored-by: danny0405 <[email protected]> --- .../apache/hudi/table/catalog/HiveSchemaUtils.java | 8 ++++--- .../hudi/table/catalog/HoodieHiveCatalog.java | 2 +- .../hudi/table/catalog/TableOptionProperties.java | 25 ++++++++++++++++++++-- .../hudi/table/catalog/TestHoodieHiveCatalog.java | 15 +++++++++++++ 4 files changed, 44 insertions(+), 6 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java index 4383b42e9f8..fac507cb7db 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java @@ -181,9 +181,11 @@ public class HiveSchemaUtils { */ public static List<FieldSchema> toHiveFieldSchema(TableSchema schema, boolean withOperationField) { List<FieldSchema> columns = new ArrayList<>(); - Collection<String> metaFields = withOperationField - ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the set may break sequence - : HoodieRecord.HOODIE_META_COLUMNS; + Collection<String> metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS); + if (withOperationField) { + metaFields.add(HoodieRecord.OPERATION_METADATA_FIELD); + } + for (String metaField : metaFields) { columns.add(new FieldSchema(metaField, "string", null)); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 6dcdf118415..fd36a39d237 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -597,7 +597,7 @@ public class HoodieHiveCatalog extends AbstractCatalog { serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, String.valueOf(!useRealTimeInputFormat)); serdeProperties.put("serialization.format", "1"); - serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys)); + serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable, hiveConf, properties, partitionKeys, withOperationField)); sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java index a0864bbf377..6e327bdc612 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java @@ -19,6 +19,7 @@ package org.apache.hudi.table.catalog; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.configuration.FlinkOptions; @@ -28,6 +29,8 @@ import org.apache.hudi.util.AvroSchemaConverter; import org.apache.avro.Schema; import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -39,7 +42,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -49,6 +54,7 @@ import java.util.Properties; import java.util.stream.Collectors; import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; +import static org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD; import static org.apache.hudi.common.table.HoodieTableMetaClient.AUXILIARYFOLDER_NAME; /** @@ -168,8 +174,10 @@ public class TableOptionProperties { CatalogTable catalogTable, Configuration hadoopConf, Map<String, String> properties, - List<String> partitionKeys) { - Schema schema = AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType()); + List<String> partitionKeys, + boolean withOperationField) { + RowType rowType = supplementMetaFields((RowType) catalogTable.getSchema().toPhysicalRowDataType().getLogicalType(), withOperationField); + Schema schema = AvroSchemaConverter.convertToSchema(rowType); MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf); String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION); Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties( @@ -184,6 +192,19 @@ public class TableOptionProperties { e -> e.getKey().equalsIgnoreCase(FlinkOptions.TABLE_TYPE.key()) ? VALUE_MAPPING.get(e.getValue()) : e.getValue())); } + private static RowType supplementMetaFields(RowType rowType, boolean withOperationField) { + Collection<String> metaFields = new ArrayList<>(HoodieRecord.HOODIE_META_COLUMNS); + if (withOperationField) { + metaFields.add(OPERATION_METADATA_FIELD); + } + ArrayList<RowType.RowField> rowFields = new ArrayList<>(); + for (String metaField : metaFields) { + rowFields.add(new RowType.RowField(metaField, new VarCharType(10000))); + } + rowFields.addAll(rowType.getFields()); + return new RowType(false, rowFields); + } + public static Map<String, String> translateSparkTableProperties2Flink(Map<String, String> options) { if (options.containsKey(CONNECTOR.key())) { return options; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java index 5d27cdadbbb..c697cb92509 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java @@ -139,6 +139,21 @@ public class TestHoodieHiveCatalog { .collect(Collectors.joining(",")); assertEquals("par1:string", partitionSchema); + // validate spark schema properties + String avroSchemaStr = hiveTable.getParameters().get("spark.sql.sources.schema.part.0"); + String expectedAvroSchemaStr = "" + + "{\"type\":\"struct\",\"fields\":[{\"name\":\"_hoodie_commit_time\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_commit_seqno\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_record_key\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_partition_path\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"_hoodie_file_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"uuid\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{}}," + + "{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"age\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"ts\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}," + + "{\"name\":\"par1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}"; + assertEquals(expectedAvroSchemaStr, avroSchemaStr); + // validate catalog table CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath); assertEquals("hudi", table1.getOptions().get(CONNECTOR.key()));
