This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6fcd0cfc7 [hive] Refactor schema check between Paimon schema and Hive
DDL (#1607)
6fcd0cfc7 is described below
commit 6fcd0cfc7ea1667688ef672bafe3ca567368a483
Author: tsreaper <[email protected]>
AuthorDate: Thu Jul 20 12:16:32 2023 +0800
[hive] Refactor schema check between Paimon schema and Hive DDL (#1607)
This closes #1607.
---
.../java/org/apache/paimon/hive/HiveSchema.java | 38 ++++++++++++++--------
1 file changed, 25 insertions(+), 13 deletions(-)
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
index db49c32eb..20afe352e 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/HiveSchema.java
@@ -153,8 +153,13 @@ public class HiveSchema {
LOG.debug(
"Extract schema with exists DDL and exists paimon table,
table location:[{}].",
location);
- checkSchemaMatched(
- columnNames, typeInfos, partitionKeys, partitionTypeInfos,
tableSchema.get());
+
+ boolean isPartitionedTable =
+ partitionTypeInfos.size() > 0
+ // for some Hive compatible system
+ ||
properties.containsKey("TABLE_TOTAL_PARTITIONS");
+ checkFieldsMatched(columnNames, typeInfos, tableSchema.get(),
isPartitionedTable);
+ checkPartitionMatched(partitionKeys, partitionTypeInfos,
tableSchema.get());
// Use paimon table data types and column comments when the paimon
table exists.
// Using paimon data types first because hive's
TypeInfoFactory.timestampTypeInfo
@@ -188,18 +193,20 @@ public class HiveSchema {
try {
return new SchemaManager(FileIO.get(path, context), path).latest();
} catch (IOException e) {
- throw new RuntimeException(e);
+ LOG.warn(
+ "Failed to fetch Paimon table schema from path "
+ + path
+ + ", relying on Hive DDL instead.",
+ e);
+ return Optional.empty();
}
}
- private static void checkSchemaMatched(
+ private static void checkFieldsMatched(
List<String> hiveFieldNames,
List<TypeInfo> hiveFieldTypeInfos,
- List<String> hivePartitionKeys,
- List<TypeInfo> hivePartitionTypeInfos,
- TableSchema tableSchema) {
- // compare field names and type infos
-
+ TableSchema tableSchema,
+ boolean isPartitionedTable) {
Set<String> schemaPartitionKeySet = new
HashSet<>(tableSchema.partitionKeys());
List<String> schemaFieldNames = new ArrayList<>();
List<TypeInfo> schemaFieldTypeInfos = new ArrayList<>();
@@ -207,7 +214,9 @@ public class HiveSchema {
// case #1: if the Hive table is not a partitioned table, pick all
fields
// case #2: if the Hive table is a partitioned table, we only pick
fields which are not
// part of partition keys
- if (hivePartitionKeys.isEmpty() ||
!schemaPartitionKeySet.contains(field.name())) {
+ boolean isPartitionColumn =
+ isPartitionedTable &&
schemaPartitionKeySet.contains(field.name());
+ if (!isPartitionColumn) {
schemaFieldNames.add(field.name());
schemaFieldTypeInfos.add(HiveTypeUtils.logicalTypeToTypeInfo(field.type()));
}
@@ -252,9 +261,12 @@ public class HiveSchema {
+ "Mismatched fields are:\n"
+ String.join("--------------------\n",
mismatched));
}
+ }
- // compare partition keys and type infos
-
+ private static void checkPartitionMatched(
+ List<String> hivePartitionKeys,
+ List<TypeInfo> hivePartitionTypeInfos,
+ TableSchema tableSchema) {
if (hivePartitionKeys.isEmpty()) {
// only partitioned Hive table needs to consider this part
return;
@@ -283,7 +295,7 @@ public class HiveSchema {
+ "\n");
}
- mismatched = new ArrayList<>();
+ List<String> mismatched = new ArrayList<>();
for (int i = 0; i < hivePartitionKeys.size(); i++) {
if (!Objects.equals(hivePartitionKeys.get(i),
schemaPartitionKeys.get(i))
|| !Objects.equals(