This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new df7b767a33 [HUDI-5066] Support flink hoodie source metaclient cache 
(#7017)
df7b767a33 is described below

commit df7b767a332ee21c3139fd224ef4e142fe70ca49
Author: Shizhi Chen <[email protected]>
AuthorDate: Mon Nov 7 17:51:40 2022 +0800

    [HUDI-5066] Support flink hoodie source metaclient cache (#7017)
---
 .../src/main/java/org/apache/hudi/table/HoodieTableSource.java   | 9 +++++----
 .../test/java/org/apache/hudi/table/TestHoodieTableSource.java   | 9 +++++++++
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 8571390f53..31aba2b2db 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -136,7 +136,7 @@ public class HoodieTableSource implements
       List<String> partitionKeys,
       String defaultPartName,
       Configuration conf) {
-    this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, 
null);
+    this(schema, path, partitionKeys, defaultPartName, conf, null, null, null, 
null, null);
   }
 
   public HoodieTableSource(
@@ -148,7 +148,8 @@ public class HoodieTableSource implements
       @Nullable FileIndex fileIndex,
       @Nullable List<Map<String, String>> requiredPartitions,
       @Nullable int[] requiredPos,
-      @Nullable Long limit) {
+      @Nullable Long limit,
+      @Nullable HoodieTableMetaClient metaClient) {
     this.schema = schema;
     this.tableRowType = (RowType) 
schema.toPhysicalRowDataType().notNull().getLogicalType();
     this.path = path;
@@ -164,7 +165,7 @@ public class HoodieTableSource implements
         : requiredPos;
     this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
     this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
-    this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
+    this.metaClient = metaClient == null ? 
StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient;
     this.maxCompactionMemoryInBytes = 
StreamerUtil.getMaxCompactionMemoryInBytes(conf);
   }
 
@@ -215,7 +216,7 @@ public class HoodieTableSource implements
   @Override
   public DynamicTableSource copy() {
     return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
-        conf, fileIndex, requiredPartitions, requiredPos, limit);
+        conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index d8093793fc..10a7e44373 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.utils.TestConfigurations;
@@ -148,6 +149,14 @@ public class TestHoodieTableSource {
     assertEquals(expectedFilters, actualFilters);
   }
 
+  @Test
+  void testHoodieSourceCachedMetaClient() {
+    HoodieTableSource tableSource = getEmptyStreamingSource();
+    HoodieTableMetaClient metaClient = tableSource.getMetaClient();
+    HoodieTableSource tableSourceCopy = (HoodieTableSource) tableSource.copy();
+    assertThat(metaClient, is(tableSourceCopy.getMetaClient()));
+  }
+
   private HoodieTableSource getEmptyStreamingSource() {
     final String path = tempFile.getAbsolutePath();
     conf = TestConfigurations.getDefaultConf(path);

Reply via email to