This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit e49c2826e415046eceade2d123e213d80c6b108d
Author: Danny Chan <[email protected]>
AuthorDate: Thu Aug 4 09:53:09 2022 +0800

    [HUDI-4531] Wrong partition path for flink hive catalog when the partition 
fields are not in the last (#6292)
---
 .../apache/hudi/table/catalog/HiveSchemaUtils.java | 31 +++++++++++++++++----
 .../hudi/table/catalog/HoodieCatalogUtil.java      | 20 ++++++++++++++
 .../hudi/table/catalog/HoodieHiveCatalog.java      | 32 +++++++++-------------
 .../hudi/table/catalog/TableOptionProperties.java  |  8 ++++--
 .../hudi/table/catalog/TestHoodieHiveCatalog.java  | 28 ++++++++++++-------
 5 files changed, 83 insertions(+), 36 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
index c9590ff4a2..ea965f5c01 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HiveSchemaUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 
 import org.apache.flink.table.api.DataTypes;
@@ -40,6 +41,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -49,11 +51,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 public class HiveSchemaUtils {
   /** Get field names from field schemas. */
   public static List<String> getFieldNames(List<FieldSchema> fieldSchemas) {
-    List<String> names = new ArrayList<>(fieldSchemas.size());
-    for (FieldSchema fs : fieldSchemas) {
-      names.add(fs.getName());
-    }
-    return names;
+    return 
fieldSchemas.stream().map(FieldSchema::getName).collect(Collectors.toList());
   }
 
   public static org.apache.flink.table.api.Schema convertTableSchema(Table 
hiveTable) {
@@ -204,4 +202,27 @@ public class HiveSchemaUtils {
     LogicalType logicalType = dataType.getLogicalType();
     return logicalType.accept(new TypeInfoLogicalTypeVisitor(dataType));
   }
+
+  /**
+   * Split the field schemas by given partition keys.
+   *
+   * @param fieldSchemas  The Hive field schemas.
+   * @param partitionKeys The partition keys.
+   *
+   * @return The pair of (regular columns, partition columns) schema fields
+   */
+  public static Pair<List<FieldSchema>, List<FieldSchema>> 
splitSchemaByPartitionKeys(
+      List<FieldSchema> fieldSchemas,
+      List<String> partitionKeys) {
+    List<FieldSchema> regularColumns = new ArrayList<>();
+    List<FieldSchema> partitionColumns = new ArrayList<>();
+    for (FieldSchema fieldSchema : fieldSchemas) {
+      if (partitionKeys.contains(fieldSchema.getName())) {
+        partitionColumns.add(fieldSchema);
+      } else {
+        regularColumns.add(fieldSchema);
+      }
+    }
+    return Pair.of(regularColumns, partitionColumns);
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
index f546300249..3dc191afb4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -18,8 +18,10 @@
 
 package org.apache.hudi.table.catalog;
 
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 
+import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -33,6 +35,10 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 import static org.apache.hudi.table.catalog.CatalogOptions.HIVE_SITE_FILE;
@@ -93,4 +99,18 @@ public class HoodieCatalogUtil {
   public static boolean isEmbeddedMetastore(HiveConf hiveConf) {
     return 
isNullOrWhitespaceOnly(hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS));
   }
+
+  /**
+   * Returns the partition key list with given table.
+   */
+  public static List<String> getPartitionKeys(CatalogTable table) {
+    // the PARTITIONED BY syntax always has higher priority than option 
FlinkOptions#PARTITION_PATH_FIELD
+    if (table.isPartitioned()) {
+      return table.getPartitionKeys();
+    } else if 
(table.getOptions().containsKey(FlinkOptions.PARTITION_PATH_FIELD.key())) {
+      return 
Arrays.stream(table.getOptions().get(FlinkOptions.PARTITION_PATH_FIELD.key()).split(","))
+          .collect(Collectors.toList());
+    }
+    return Collections.emptyList();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 1e877b133e..07f6291145 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieCatalogException;
@@ -86,7 +87,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -539,25 +539,19 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     List<FieldSchema> allColumns = 
HiveSchemaUtils.createHiveColumns(table.getSchema());
 
     // Table columns and partition keys
-    if (table instanceof CatalogTable) {
-      CatalogTable catalogTable = (CatalogTable) table;
-
-      if (catalogTable.isPartitioned()) {
-        int partitionKeySize = catalogTable.getPartitionKeys().size();
-        List<FieldSchema> regularColumns =
-            allColumns.subList(0, allColumns.size() - partitionKeySize);
-        List<FieldSchema> partitionColumns =
-            allColumns.subList(
-                allColumns.size() - partitionKeySize, allColumns.size());
-
-        sd.setCols(regularColumns);
-        hiveTable.setPartitionKeys(partitionColumns);
-      } else {
-        sd.setCols(allColumns);
-        hiveTable.setPartitionKeys(new ArrayList<>());
-      }
+    CatalogTable catalogTable = (CatalogTable) table;
+
+    final List<String> partitionKeys = 
HoodieCatalogUtil.getPartitionKeys(catalogTable);
+    if (partitionKeys.size() > 0) {
+      Pair<List<FieldSchema>, List<FieldSchema>> splitSchemas = 
HiveSchemaUtils.splitSchemaByPartitionKeys(allColumns, partitionKeys);
+      List<FieldSchema> regularColumns = splitSchemas.getLeft();
+      List<FieldSchema> partitionColumns = splitSchemas.getRight();
+
+      sd.setCols(regularColumns);
+      hiveTable.setPartitionKeys(partitionColumns);
     } else {
       sd.setCols(allColumns);
+      hiveTable.setPartitionKeys(Collections.emptyList());
     }
 
     HoodieFileFormat baseFileFormat = HoodieFileFormat.PARQUET;
@@ -572,7 +566,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     serdeProperties.put(ConfigUtils.IS_QUERY_AS_RO_TABLE, 
String.valueOf(!useRealTimeInputFormat));
     serdeProperties.put("serialization.format", "1");
 
-    
serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark((CatalogTable)table,
 hiveConf, properties));
+    
serdeProperties.putAll(TableOptionProperties.translateFlinkTableProperties2Spark(catalogTable,
 hiveConf, properties, partitionKeys));
 
     sd.setSerdeInfo(new SerDeInfo(null, serDeClassName, serdeProperties));
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index 9477cd6daf..a0864bbf37 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -164,12 +164,16 @@ public class TableOptionProperties {
     return copied;
   }
 
-  public static Map<String, String> 
translateFlinkTableProperties2Spark(CatalogTable catalogTable, Configuration 
hadoopConf, Map<String, String> properties) {
+  public static Map<String, String> translateFlinkTableProperties2Spark(
+      CatalogTable catalogTable,
+      Configuration hadoopConf,
+      Map<String, String> properties,
+      List<String> partitionKeys) {
     Schema schema = 
AvroSchemaConverter.convertToSchema(catalogTable.getSchema().toPhysicalRowDataType().getLogicalType());
     MessageType messageType = 
TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
     String sparkVersion = 
catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
     Map<String, String> sparkTableProperties = 
SparkDataSourceTableUtils.getSparkTableProperties(
-        catalogTable.getPartitionKeys(),
+        partitionKeys,
         sparkVersion,
         4000,
         messageType);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
index da6cde4e89..66ba520af9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieHiveCatalog.java
@@ -66,8 +66,8 @@ public class TestHoodieHiveCatalog {
           .field("uuid", DataTypes.INT().notNull())
           .field("name", DataTypes.STRING())
           .field("age", DataTypes.INT())
-          .field("ts", DataTypes.BIGINT())
           .field("par1", DataTypes.STRING())
+          .field("ts", DataTypes.BIGINT())
           .primaryKey("uuid")
           .build();
   List<String> partitions = Collections.singletonList("par1");
@@ -95,21 +95,29 @@ public class TestHoodieHiveCatalog {
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   public void testCreateAndGetHoodieTable(HoodieTableType tableType) throws 
Exception {
-    Map<String, String> originOptions = new HashMap<>();
-    originOptions.put(FactoryUtil.CONNECTOR.key(), "hudi");
-    originOptions.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString());
+    Map<String, String> options = new HashMap<>();
+    options.put(FactoryUtil.CONNECTOR.key(), "hudi");
+    options.put(FlinkOptions.TABLE_TYPE.key(), tableType.toString());
 
     CatalogTable table =
-        new CatalogTableImpl(schema, partitions, originOptions, "hudi table");
+        new CatalogTableImpl(schema, partitions, options, "hudi table");
     hoodieCatalog.createTable(tablePath, table, false);
 
     CatalogBaseTable table1 = hoodieCatalog.getTable(tablePath);
-    assertEquals(table1.getOptions().get(CONNECTOR.key()), "hudi");
-    assertEquals(table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()), 
tableType.toString());
-    assertEquals(table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()), 
"uuid");
+    assertEquals("hudi", table1.getOptions().get(CONNECTOR.key()));
+    assertEquals(tableType.toString(), 
table1.getOptions().get(FlinkOptions.TABLE_TYPE.key()));
+    assertEquals("uuid", 
table1.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
     assertNull(table1.getOptions().get(FlinkOptions.PRECOMBINE_FIELD.key()), 
"preCombine key is not declared");
-    
assertEquals(table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames(),
 Collections.singletonList("uuid"));
-    assertEquals(((CatalogTable)table1).getPartitionKeys(), 
Collections.singletonList("par1"));
+    assertEquals(Collections.singletonList("uuid"), 
table1.getUnresolvedSchema().getPrimaryKey().get().getColumnNames());
+    assertEquals(Collections.singletonList("par1"), 
((CatalogTable)table1).getPartitionKeys());
+
+    // test explicit primary key
+    options.put(FlinkOptions.RECORD_KEY_FIELD.key(), "id");
+    table = new CatalogTableImpl(schema, partitions, options, "hudi table");
+    hoodieCatalog.alterTable(tablePath, table, true);
+
+    CatalogBaseTable table2 = hoodieCatalog.getTable(tablePath);
+    assertEquals("id", 
table2.getOptions().get(FlinkOptions.RECORD_KEY_FIELD.key()));
   }
 
   @ParameterizedTest

Reply via email to