This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new db2ab9a  [HUDI-2403] Add metadata table listing for flink query source 
(#3618)
db2ab9a is described below

commit db2ab9a15092c1423ee308b1bba7dd7e2f5027a8
Author: Danny Chan <[email protected]>
AuthorDate: Wed Sep 8 14:52:39 2021 +0800

    [HUDI-2403] Add metadata table listing for flink query source (#3618)
---
 .../apache/hudi/configuration/FlinkOptions.java    |   2 +-
 .../java/org/apache/hudi/source/FileIndex.java     | 149 +++++++++++++++++++++
 .../org/apache/hudi/table/HoodieTableSource.java   |  13 +-
 .../sink/utils/StreamWriteFunctionWrapper.java     |   3 +
 .../java/org/apache/hudi/source/TestFileIndex.java |  72 ++++++++++
 .../apache/hudi/table/TestHoodieTableSource.java   |   9 +-
 6 files changed, 234 insertions(+), 14 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 8504f6a..6e0ff52 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -72,7 +72,7 @@ public class FlinkOptions extends HoodieConfig {
   public static final ConfigOption<String> PARTITION_DEFAULT_NAME = 
ConfigOptions
       .key("partition.default_name")
       .stringType()
-      .defaultValue("__DEFAULT_PARTITION__")
+      .defaultValue("default") // keep sync with hoodie style
       .withDescription("The default partition name in case the dynamic 
partition"
           + " column value is null/empty string");
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java 
b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
new file mode 100644
index 0000000..bf37f12
--- /dev/null
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A file index which supports listing files efficiently through metadata 
table.
+ *
+ * <p>It caches the partition paths to avoid redundant look up.
+ */
+public class FileIndex {
+  private final Path path;
+  private final HoodieMetadataConfig metadataConfig;
+  private List<String> partitionPaths; // cache of partition paths
+
+  private FileIndex(Path path, Configuration conf) {
+    this.path = path;
+    this.metadataConfig = metadataConfig(conf);
+  }
+
+  public static FileIndex instance(Path path, Configuration conf) {
+    return new FileIndex(path, conf);
+  }
+
+  /**
+   * Returns the partition path key and values as a list of map, each map item 
in the list
+   * is a mapping of the partition key name to its actual partition value. For 
example, say
+   * there is a file path with partition keys [key1, key2, key3]:
+   *
+   * <p><pre>
+   *   -- file:/// ... key1=val1/key2=val2/key3=val3
+   *   -- file:/// ... key1=val4/key2=val5/key3=val6
+   * </pre>
+   *
+   * <p>The return list should be [{key1:val1, key2:val2, key3:val3}, 
{key1:val4, key2:val5, key3:val6}].
+   *
+   * @param partitionKeys  The partition key list
+   * @param defaultParName The default partition name for nulls
+   * @param hivePartition  Whether the partition path is in Hive style
+   */
+  public List<Map<String, String>> getPartitions(
+      List<String> partitionKeys,
+      String defaultParName,
+      boolean hivePartition) {
+    if (partitionKeys.size() == 0) {
+      // non partitioned table
+      return Collections.emptyList();
+    }
+    List<String> partitionPaths = getOrBuildPartitionPaths();
+    if (partitionPaths.size() == 1 && partitionPaths.get(0).isEmpty()) {
+      return Collections.emptyList();
+    }
+    List<Map<String, String>> partitions = new ArrayList<>();
+    for (String partitionPath : partitionPaths) {
+      String[] paths = partitionPath.split(File.separator);
+      Map<String, String> partitionMapping = new LinkedHashMap<>();
+      if (hivePartition) {
+        Arrays.stream(paths).forEach(p -> {
+          String[] kv = p.split("=");
+          if (kv.length == 2) {
+            partitionMapping.put(kv[0], defaultParName.equals(kv[1]) ? null : 
kv[1]);
+          }
+        });
+      } else {
+        for (int i = 0; i < partitionKeys.size(); i++) {
+          partitionMapping.put(partitionKeys.get(i), 
defaultParName.equals(paths[i]) ? null : paths[i]);
+        }
+      }
+      partitions.add(partitionMapping);
+    }
+    return partitions;
+  }
+
+  /**
+   * Returns all the file statuses under the table base path.
+   */
+  public FileStatus[] getFilesInPartitions() {
+    String[] partitions = getOrBuildPartitionPaths().stream().map(p -> new 
Path(path, p).toString()).toArray(String[]::new);
+    return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, 
metadataConfig, path.toString(),
+            partitions, "/tmp/")
+        .values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
+  }
+
+  /**
+   * Reset the state of the file index.
+   */
+  @VisibleForTesting
+  public void reset() {
+    this.partitionPaths = null;
+  }
+
+  // -------------------------------------------------------------------------
+  //  Utilities
+  // -------------------------------------------------------------------------
+
+  private List<String> getOrBuildPartitionPaths() {
+    if (this.partitionPaths != null) {
+      return this.partitionPaths;
+    }
+    this.partitionPaths = 
FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT,
+        metadataConfig, path.toString());
+    return this.partitionPaths;
+  }
+
+  private static HoodieMetadataConfig 
metadataConfig(org.apache.flink.configuration.Configuration conf) {
+    Properties properties = new Properties();
+
+    // set up metadata.enabled=true in table DDL to enable metadata listing
+    properties.put(HoodieMetadataConfig.ENABLE, 
conf.getBoolean(FlinkOptions.METADATA_ENABLED));
+    properties.put(HoodieMetadataConfig.SYNC_ENABLE, 
conf.getBoolean(FlinkOptions.METADATA_ENABLED));
+    properties.put(HoodieMetadataConfig.VALIDATE_ENABLE, false);
+
+    return 
HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
+  }
+}
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 78d1db6..43743fc 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.HoodieROTablePathFilter;
+import org.apache.hudi.source.FileIndex;
 import org.apache.hudi.source.StreamReadMonitoringFunction;
 import org.apache.hudi.source.StreamReadOperator;
 import org.apache.hudi.table.format.FilePathUtils;
@@ -116,6 +117,7 @@ public class HoodieTableSource implements
   private final List<String> partitionKeys;
   private final String defaultPartName;
   private final Configuration conf;
+  private final FileIndex fileIndex;
 
   private int[] requiredPos;
   private long limit;
@@ -147,6 +149,7 @@ public class HoodieTableSource implements
     this.partitionKeys = partitionKeys;
     this.defaultPartName = defaultPartName;
     this.conf = conf;
+    this.fileIndex = FileIndex.instance(this.path, this.conf);
     this.requiredPartitions = requiredPartitions;
     this.requiredPos = requiredPos == null
         ? IntStream.range(0, schema.getColumnCount()).toArray()
@@ -222,8 +225,8 @@ public class HoodieTableSource implements
 
   @Override
   public Optional<List<Map<String, String>>> listPartitions() {
-    List<Map<String, String>> partitions = FilePathUtils.getPartitions(path, 
hadoopConf,
-        partitionKeys, defaultPartName, 
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
+    List<Map<String, String>> partitions = this.fileIndex.getPartitions(
+        this.partitionKeys, defaultPartName, 
conf.getBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING));
     return Optional.of(partitions);
   }
 
@@ -277,10 +280,7 @@ public class HoodieTableSource implements
     if (paths.length == 0) {
       return Collections.emptyList();
     }
-    FileStatus[] fileStatuses = Arrays.stream(paths)
-        .flatMap(path ->
-            Arrays.stream(FilePathUtils.getFileStatusRecursively(path, 1, 
hadoopConf)))
-        .toArray(FileStatus[]::new);
+    FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
     if (fileStatuses.length == 0) {
       throw new HoodieException("No files found for reading in user provided 
path.");
     }
@@ -492,6 +492,7 @@ public class HoodieTableSource implements
   public void reset() {
     this.metaClient.reloadActiveTimeline();
     this.requiredPartitions = null;
+    this.fileIndex.reset();
   }
 
   /**
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 6b6bede..c5d3ec5 100644
--- 
a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -142,6 +142,9 @@ public class StreamWriteFunctionWrapper<I> {
   public void openFunction() throws Exception {
     this.coordinator.start();
     this.coordinator.setExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+    if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) {
+      this.coordinator.setMetadataSyncExecutor(new 
MockCoordinatorExecutor(coordinatorContext));
+    }
     toHoodieFunction = new 
RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf);
     toHoodieFunction.setRuntimeContext(runtimeContext);
     toHoodieFunction.open(conf);
diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java 
b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
new file mode 100644
index 0000000..b1f442f
--- /dev/null
+++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.source;
+
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.utils.TestConfigurations;
+import org.apache.hudi.utils.TestData;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test cases for {@link FileIndex}.
+ */
+public class TestFileIndex {
+  @TempDir
+  File tempFile;
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  void testFileListingUsingMetadata(boolean hiveStylePartitioning) throws 
Exception {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
+    conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, 
hiveStylePartitioning);
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+    FileIndex fileIndex = FileIndex.instance(new 
Path(tempFile.getAbsolutePath()), conf);
+    List<String> partitionKeys = Collections.singletonList("partition");
+    List<Map<String, String>> partitions = 
fileIndex.getPartitions(partitionKeys, "default", hiveStylePartitioning);
+    assertTrue(partitions.stream().allMatch(m -> m.size() == 1));
+    String partitionPaths = partitions.stream()
+        
.map(Map::values).flatMap(Collection::stream).sorted().collect(Collectors.joining(","));
+    assertThat("should have 4 partitions", partitionPaths, 
is("par1,par2,par3,par4"));
+
+    FileStatus[] fileStatuses = fileIndex.getFilesInPartitions();
+    assertThat(fileStatuses.length, is(4));
+    assertTrue(Arrays.stream(fileStatuses)
+        .allMatch(fileStatus -> 
fileStatus.getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())));
+  }
+}
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java 
b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index 25742a7..d50a716 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -21,7 +21,6 @@ package org.apache.hudi.table;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
-import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
 
@@ -37,14 +36,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -64,12 +61,10 @@ public class TestHoodieTableSource {
   @TempDir
   File tempFile;
 
-  void beforeEach() throws IOException {
+  void beforeEach() throws Exception {
     final String path = tempFile.getAbsolutePath();
     conf = TestConfigurations.getDefaultConf(path);
-    StreamerUtil.initTableIfNotExists(conf);
-    IntStream.range(1, 5)
-        .forEach(i -> new File(path + File.separator + "par" + i).mkdirs());
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
   }
 
   @Test

Reply via email to