This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f750773109a [HUDI-5275] Fix reading data using the HoodieHiveCatalog
will cause the Spark write to fail (#7666)
f750773109a is described below
commit f750773109abd78f5f1b41bb31b27711a7201126
Author: Danny Chan <[email protected]>
AuthorDate: Mon Jan 16 09:39:04 2023 +0800
[HUDI-5275] Fix reading data using the HoodieHiveCatalog will cause the
Spark write to fail (#7666)
Fix the hive_style_partitioning option to be in line with the table config.
Co-authored-by: waywtdcc <[email protected]>
---
.../hudi/table/catalog/HoodieHiveCatalog.java | 23 ++++++++++++++++------
.../java/org/apache/hudi/util/StreamerUtil.java | 20 +++++++++++++++++++
2 files changed, 37 insertions(+), 6 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index c7babb09a13..8644435b5ab 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table.catalog;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.StringUtils;
@@ -383,12 +384,22 @@ public class HoodieHiveCatalog extends AbstractCatalog {
String path = hiveTable.getSd().getLocation();
parameters.put(PATH.key(), path);
if
(!parameters.containsKey(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
- Path hoodieTablePath = new Path(path);
- boolean hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath,
hiveConf).listStatus(hoodieTablePath))
- .map(fileStatus -> fileStatus.getPath().getName())
- .filter(f -> !f.equals(".hoodie") && !f.equals("default"))
- .anyMatch(FilePathUtils::isHiveStylePartitioning);
- parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(),
String.valueOf(hiveStyle));
+ // read the table config first
+ final boolean hiveStyle;
+ HoodieTableConfig tableConfig = StreamerUtil.getTableConfig(path,
hiveConf);
+ if (tableConfig != null &&
tableConfig.contains(FlinkOptions.HIVE_STYLE_PARTITIONING.key())) {
+ hiveStyle =
Boolean.parseBoolean(tableConfig.getHiveStylePartitioningEnable());
+ } else {
+ // fallback to the partition path pattern
+ Path hoodieTablePath = new Path(path);
+ hiveStyle = Arrays.stream(FSUtils.getFs(hoodieTablePath,
hiveConf).listStatus(hoodieTablePath))
+ .map(fileStatus -> fileStatus.getPath().getName())
+ .filter(f -> !f.equals(".hoodie") && !f.equals("default"))
+ .anyMatch(FilePathUtils::isHiveStylePartitioning);
+ }
+ if (hiveStyle) {
+ parameters.put(FlinkOptions.HIVE_STYLE_PARTITIONING.key(), "true");
+ }
}
client.alter_table(tablePath.getDatabaseName(),
tablePath.getObjectName(), hiveTable);
} catch (Exception e) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index d7df3d8b536..de37b790159 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -21,6 +21,7 @@ package org.apache.hudi.util;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
@@ -56,6 +57,8 @@ import org.apache.parquet.hadoop.ParquetFileWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
@@ -282,6 +285,23 @@ public class StreamerUtil {
return createMetaClient(conf.getString(FlinkOptions.PATH),
HadoopConfigurations.getHadoopConf(conf));
}
+ /**
+ * Returns the table config or null if the table does not exist.
+ */
+ @Nullable
+ public static HoodieTableConfig getTableConfig(String basePath,
org.apache.hadoop.conf.Configuration hadoopConf) {
+ FileSystem fs = FSUtils.getFs(basePath, hadoopConf);
+ Path metaPath = new Path(basePath, HoodieTableMetaClient.METAFOLDER_NAME);
+ try {
+ if (fs.exists(metaPath)) {
+ return new HoodieTableConfig(fs, metaPath.toString(), null, null);
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Get table config error", e);
+ }
+ return null;
+ }
+
/**
* Returns the median instant time between the given two instant time.
*/