This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7a1a6837e0 [HUDI-5088]Fix bug:Failed to synchronize the hive metadata
of the Flink table (#7056)
7a1a6837e0 is described below
commit 7a1a6837e0c7be2cb401fbe6be8bbbb72064feae
Author: chao chen <[email protected]>
AuthorDate: Mon Nov 7 10:13:57 2022 +0800
[HUDI-5088]Fix bug:Failed to synchronize the hive metadata of the Flink
table (#7056)
* sync `_hoodie_operation` meta field if changelog mode is enabled.
---
.../main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java | 8 ++++++--
.../java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java | 3 ++-
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index a057c02f2c..4383b42e9f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
@@ -178,9 +179,12 @@ public class HiveSchemaUtils {
/**
* Create Hive field schemas from Flink table schema including the hoodie
metadata fields.
*/
- public static List<FieldSchema> toHiveFieldSchema(TableSchema schema) {
+ public static List<FieldSchema> toHiveFieldSchema(TableSchema schema,
boolean withOperationField) {
List<FieldSchema> columns = new ArrayList<>();
- for (String metaField : HoodieRecord.HOODIE_META_COLUMNS) {
+ Collection<String> metaFields = withOperationField
+ ? HoodieRecord.HOODIE_META_COLUMNS_WITH_OPERATION // caution that the
set may break sequence
+ : HoodieRecord.HOODIE_META_COLUMNS;
+ for (String metaField : metaFields) {
columns.add(new FieldSchema(metaField, "string", null));
}
columns.addAll(createHiveColumns(schema));
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index d6e70f16ea..85f82d53d9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -553,7 +553,8 @@ public class HoodieHiveCatalog extends AbstractCatalog {
// because since Hive 3.x, there is validation when altering table,
// when the metadata fields are synced through the hive sync tool,
// a compatability issue would be reported.
- List<FieldSchema> allColumns =
HiveSchemaUtils.toHiveFieldSchema(table.getSchema());
+ boolean withOperationField =
Boolean.parseBoolean(table.getOptions().getOrDefault(FlinkOptions.CHANGELOG_ENABLED.key(),
"false"));
+ List<FieldSchema> allColumns =
HiveSchemaUtils.toHiveFieldSchema(table.getSchema(), withOperationField);
// Table columns and partition keys
CatalogTable catalogTable = (CatalogTable) table;