This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new db2ab9a [HUDI-2403] Add metadata table listing for flink query source
(#3618)
db2ab9a is described below
commit db2ab9a15092c1423ee308b1bba7dd7e2f5027a8
Author: Danny Chan <[email protected]>
AuthorDate: Wed Sep 8 14:52:39 2021 +0800
[HUDI-2403] Add metadata table listing for flink query source (#3618)
---
.../apache/hudi/configuration/FlinkOptions.java | 2 +-
.../java/org/apache/hudi/source/FileIndex.java | 149 +++++++++++++++++++++
.../org/apache/hudi/table/HoodieTableSource.java | 13 +-
.../sink/utils/StreamWriteFunctionWrapper.java | 3 +
.../java/org/apache/hudi/source/TestFileIndex.java | 72 ++++++++++
.../apache/hudi/table/TestHoodieTableSource.java | 9 +-
6 files changed, 234 insertions(+), 14 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 8504f6a..6e0ff52 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -72,7 +72,7 @@ public class FlinkOptions extends HoodieConfig {
public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
ConfigOptions
.key("partition.default_name")
.stringType()
- .defaultValue("__DEFAULT_PARTITION__")
+ .defaultValue("default") // keep sync with hoodie style
.withDescription("The default partition name in case the dynamic
partition"
+ " column value is null/empty string");
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
new file mode 100644
index 0000000..bf37f12
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A file index which supports listing files efficiently through metadata
table.
+ *
+ * <p>It caches the partition paths to avoid redundant look up.
+ */
+public class FileIndex {
+ private final Path path;
+ private final HoodieMetadataConfig metadataConfig;
+ private List<String> partitionPaths; // cache of partition paths
+
+ private FileIndex(Path path, Configuration conf) {
+ this.path = path;
+ this.metadataConfig = metadataConfig(conf);
+ }
+
+ public static FileIndex instance(Path path, Configuration conf) {
+ return new FileIndex(path, conf);
+ }
+
+ /**
+ * Returns the partition path key and values as a list of map, each map item
in the list
+ * is a mapping of the partition key name to its actual partition value. For
example, say
+ * there is a file path with partition keys [key1, key2, key3]:
+ *
+ * <p><pre>
+ * -- file:/// ... key1=val1/key2=val2/key3=val3
+ * -- file:/// ... key1=val4/key2=val5/key3=val6
+ * </pre>
+ *
+ * <p>The return list should be [{key1:val1, key2:val2, key3:val3},
{key1:val4, key2:val5, key3:val6}].
+ *
+ * @param partitionKeys The partition key list
+ * @param defaultParName The default partition name for nulls
+ * @param hivePartition Whether the partition path is in Hive style
+ */
+ public List<Map<String, String>> getPartitions(
+ List<String> partitionKeys,
+ String defaultParName,
+ boolean hivePartition) {
+ if (partitionKeys.size() == 0) {
+ // non partitioned table
+ return Collections.emptyList();
+ }
+ List<String> partitionPaths = getOrBuildPartitionPaths();
+ if (partitionPaths.size() == 1 && partitionPaths.get(0).isEmpty()) {
+ return Collections.emptyList();
+ }
+ List<Map<String, String>> partitions = new ArrayList<>();
+ for (String partitionPath : partitionPaths) {
+ String[] paths = partitionPath.split(File.separator);
+ Map<String, String> partitionMapping = new LinkedHashMap<>();
+ if (hivePartition) {
+ Arrays.stream(paths).forEach(p -> {
+ String[] kv = p.split("=");
+ if (kv.length == 2) {
+ partitionMapping.put(kv[0], defaultParName.equals(kv[1]) ? null :
kv[1]);
+ }
+ });
+ } else {
+ for (int i = 0; i < partitionKeys.size(); i++) {
+ partitionMapping.put(partitionKeys.get(i),
defaultParName.equals(paths[i]) ? null : paths[i]);
+ }
+ }
+ partitions.add(partitionMapping);
+ }
+ return partitions;
+ }
+
+ /**
+ * Returns all the file statuses under the table base path.
+ */
+ public FileStatus[] getFilesInPartitions() {
+ String[] partitions = getOrBuildPartitionPaths().stream().map(p -> new
Path(path, p).toString()).toArray(String[]::new);
+ return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT,
metadataConfig, path.toString(),
+ partitions, "/tmp/")
+ .values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
+ }
+
+ /**
+ * Reset the state of the file index.
+ */
+ @VisibleForTesting
+ public void reset() {
+ this.partitionPaths = null;
+ }
+
+ // -------------------------------------------------------------------------
+ // Utilities
+ // -------------------------------------------------------------------------
+
+ private List<String> getOrBuildPartitionPaths() {
+ if (this.partitionPaths != null) {
+ return this.partitionPaths;
+ }
+ this.partitionPaths =
FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT,
+ metadataConfig, path.toString());
+ return this.partitionPaths;
+ }
+
+ private static HoodieMetadataConfig
metadataConfig(org.apache.flink.configuration.Configuration conf) {
+ Properties properties = new Properties();
+
+ // set up metadata.enabled=true in table DDL to enable metadata listing
+ properties.put(HoodieMetadataConfig.ENABLE,
conf.getBoolean(FlinkOptions.METADATA_ENABLED));
+ properties.put(HoodieMetadataConfig.SYNC_ENABLE,
conf.getBoolean(FlinkOptions.METADATA_ENABLED));
+ properties.put(HoodieMetadataConfig.VALIDATE_ENABLE, false);
+
+ return
HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
+ }
+}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 78d1db6..43743fc 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.hadoop.HoodieROTablePathFilter;
+import org.apache.hudi.source.FileIndex;
import org.apache.hudi.source.StreamReadMonitoringFunction;
import org.apache.hudi.source.StreamReadOperator;
import org.apache.hudi.table.format.FilePathUtils;
@@ -116,6 +117,7 @@ public class HoodieTableSource implements
private final List<String> partitionKeys;
private final String defaultPartName;
private final Configuration conf;
+ private final FileIndex fileIndex;
private int[] requiredPos;
private long limit;
@@ -147,6 +149,7 @@ public class HoodieTableSource implements
this.partitionKeys = partitionKeys;
this.defaultPartName = defaultPartName;
this.conf = conf;
+ this.fileIndex = FileIndex.instance(this.path, this.conf);
this.requiredPartitions = requiredPartitions;
this.requiredPos = requiredPos == null
? IntStream.range(0, schema.getColumnCount()).toArray()
@@ -222,8 +225,8 @@ public class HoodieTableSource implements
@Override
public Optional<List<Map<String, String>>> listPartitions() {
- List<Map<String, String>> partitions = FilePathUtils.getPartitions(path,
hadoopConf,
- partitionKeys, defaultPartName,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
+ List<Map<String, String>> partitions = this.fileIndex.getPartitions(
+ this.partitionKeys, defaultPartName,
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
return Optional.of(partitions);
}
@@ -277,10 +280,7 @@ public class HoodieTableSource implements
if (paths.length == 0) {
return Collections.emptyList();
}
- FileStatus[] fileStatuses = Arrays.stream(paths)
- .flatMap(path ->
- Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1,
hadoopConf)))
- .toArray(FileStatus[]::new);
+ FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
if (fileStatuses.length == 0) {
throw new HoodieException("No files found for reading in user provided
path.");
}
@@ -492,6 +492,7 @@ public class HoodieTableSource implements
public void reset() {
this.metaClient.reloadActiveTimeline();
this.requiredPartitions = null;
+ this.fileIndex.reset();
}
/**
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 6b6bede..c5d3ec5 100644
---
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -142,6 +142,9 @@ public class StreamWriteFunctionWrapper<I> {
public void openFunction() throws Exception {
this.coordinator.start();
this.coordinator.setExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
+ this.coordinator.setMetadataSyncExecutor(new
MockCoordinatorExecutor(coordinatorContext));
+ }
toHoodieFunction = new
RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
toHoodieFunction.setRuntimeContext(runtimeContext);
toHoodieFunction.open(conf);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
new file mode 100644
index 0000000..b1f442f
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link FileIndex}.
+ */
+public class TestFileIndex {
+ @TempDir
+ File tempFile;
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testFileListingUsingMetadata(boolean hiveStylePartitioning) throws
Exception {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
+ conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING,
hiveStylePartitioning);
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+ FileIndex fileIndex = FileIndex.instance(new
Path(tempFile.getAbsolutePath()), conf);
+ List<String> partitionKeys = Collections.singletonList("partition");
+ List<Map<String, String>> partitions =
fileIndex.getPartitions(partitionKeys, "default", hiveStylePartitioning);
+ assertTrue(partitions.stream().allMatch(m -> m.size() == 1));
+ String partitionPaths = partitions.stream()
+
.map(Map::values).flatMap(Collection::stream).sorted().collect(Collectors.joining(","));
+ assertThat("should have 4 partitions", partitionPaths,
is("par1,par2,par3,par4"));
+
+ FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
+ assertThat(fileStatuses.length, is(4));
+ assertTrue(Arrays.stream(fileStatuses)
+ .allMatch(fileStatus ->
fileStatus.getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())));
+ }
+}
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 25742a7..d50a716 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
-import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -37,14 +36,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -64,12 +61,10 @@ public class TestHoodieTableSource {
@TempDir
File tempFile;
- void beforeEach() throws IOException {
+ void beforeEach() throws Exception {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);
- StreamerUtil.initTableIfNotExists(conf);
- IntStream.range(1, 5)
- .forEach(i -> new File(path + File.separator + "par" + i).mkdirs());
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
}
@Test