This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f750773109a [HUDI-5275] Fix reading data using the HoodieHiveCatalog 
will cause the Spark write to fail (#7666)
f750773109a is described below

commit f750773109abd78f5f1b41bb31b27711a7201126
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jan 16 09:39:04 2023 +0800

    [HUDI-5275] Fix reading data using the HoodieHiveCatalog will cause the 
Spark write to fail (#7666)
    
    Fix the hive_style_partitioning option to be in line with the table config.
    
    Co-authored-by: waywtdcc <[email protected]>
---
 .../hudi/table/catalog/HoodieHiveCatalog.java      | 23 ++++++++++++++++------
 .../java/org/apache/hudi/util/StreamerUtil.java    | 20 +++++++++++++++++++
 2 files changed, 37 insertions(+), 6 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index c7babb09a13..8644435b5ab 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.catalog;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.util.StringUtils;
@@ -383,12 +384,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
         String path = hiveTable.getSd().getLocation();
         parameters.put(PATH.key(), path);
         if 
(!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
-          Path hoodieTablePath = new Path(path);
-          boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, 
hiveConf).listStatus(hoodieTablePath))
-              .map(fileStatus -> fileStatus.getPath().getName())
-              .filter(f -> !f.equals(".hoodie") && !f.equals("default"))
-              .anyMatch(FilePathUtils::isHiveStylePartitioning);
-          parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), 
String.valueOf(hiveStyle));
+          // read the table config first
+          final boolean hiveStyle;
+          HoodieTableConfig tableConfig = StreamerUtil.getTableConfig(path, 
hiveConf);
+          if (tableConfig != null && 
tableConfig.contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
+            hiveStyle = 
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+          } else {
+            // fallback to the partition path pattern
+            Path hoodieTablePath = new Path(path);
+            hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath, 
hiveConf).listStatus(hoodieTablePath))
+                .map(fileStatus -> fileStatus.getPath().getName())
+                .filter(f -> !f.equals(".hoodie") && !f.equals("default"))
+                .anyMatch(FilePathUtils::isHiveStylePartitioning);
+          }
+          if (hiveStyle) {
+            parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
+          }
         }
         client.alter_table(tablePath.getDatabaseName(), 
tablePath.getObjectName(), hiveTable);
       } catch (Exception e) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index d7df3d8b536..de37b790159 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -21,6 +21,7 @@ package org.apache.hudi.util;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -56,6 +57,8 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.StringReader;
@@ -282,6 +285,23 @@ public class StreamerUtil {
     return createMetaClient(conf.getString(FlinkOptions.PATH), 
HadoopConfigurations.getHadoopConf(conf));
   }
 
+  /**
+   * Returns the table config or null if the table does not exist.
+   */
+  @Nullable
+  public static HoodieTableConfig getTableConfig(String basePath, 
org.apache.hadoop.conf.Configuration hadoopConf) {
+    FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
+    Path metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
+    try {
+      if (fs.exists(metaPath)) {
+        return new HoodieTableConfig(fs, metaPath.toString(), null, null);
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("Get table config error", e);
+    }
+    return null;
+  }
+
   /**
    * Returns the median instant time between the given two instant time.
    */

Reply via email to