This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new df7b767a33 [HUDI-5066] Support flink hoodie source metaclient cache
(#7017)
df7b767a33 is described below
commit df7b767a332ee21c3139fd224ef4e142fe70ca49
Author: Shizhi Chen <[email protected]>
AuthorDate: Mon Nov 7 17:51:40 2022 +0800
[HUDI-5066] Support flink hoodie source metaclient cache (#7017)
---
.../src/main/java/org/apache/hudi/table/HoodieTableSource.java | 9 +++++----
.../test/java/org/apache/hudi/table/TestHoodieTableSource.java | 9 +++++++++
2 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index 8571390f53..31aba2b2db 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -136,7 +136,7 @@ public class HoodieTableSource implements
List<String> partitionKeys,
String defaultPartName,
Configuration conf) {
- this(schema, path, partitionKeys, defaultPartName, conf, null, null, null,
null);
+ this(schema, path, partitionKeys, defaultPartName, conf, null, null, null,
null, null);
}
public HoodieTableSource(
@@ -148,7 +148,8 @@ public class HoodieTableSource implements
@Nullable FileIndex fileIndex,
@Nullable List<Map<String, String>> requiredPartitions,
@Nullable int[] requiredPos,
- @Nullable Long limit) {
+ @Nullable Long limit,
+ @Nullable HoodieTableMetaClient metaClient) {
this.schema = schema;
this.tableRowType = (RowType)
schema.toPhysicalRowDataType().notNull().getLogicalType();
this.path = path;
@@ -164,7 +165,7 @@ public class HoodieTableSource implements
: requiredPos;
this.limit = limit == null ? NO_LIMIT_CONSTANT : limit;
this.hadoopConf = HadoopConfigurations.getHadoopConf(conf);
- this.metaClient = StreamerUtil.metaClientForReader(conf, hadoopConf);
+ this.metaClient = metaClient == null ?
StreamerUtil.metaClientForReader(conf, hadoopConf) : metaClient;
this.maxCompactionMemoryInBytes =
StreamerUtil.getMaxCompactionMemoryInBytes(conf);
}
@@ -215,7 +216,7 @@ public class HoodieTableSource implements
@Override
public DynamicTableSource copy() {
return new HoodieTableSource(schema, path, partitionKeys, defaultPartName,
- conf, fileIndex, requiredPartitions, requiredPos, limit);
+ conf, fileIndex, requiredPartitions, requiredPos, limit, metaClient);
}
@Override
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
index d8093793fc..10a7e44373 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java
@@ -18,6 +18,7 @@
package org.apache.hudi.table;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.utils.TestConfigurations;
@@ -148,6 +149,14 @@ public class TestHoodieTableSource {
assertEquals(expectedFilters, actualFilters);
}
+ @Test
+ void testHoodieSourceCachedMetaClient() {
+ HoodieTableSource tableSource = getEmptyStreamingSource();
+ HoodieTableMetaClient metaClient = tableSource.getMetaClient();
+ HoodieTableSource tableSourceCopy = (HoodieTableSource) tableSource.copy();
+ assertThat(metaClient, is(tableSourceCopy.getMetaClient()));
+ }
+
private HoodieTableSource getEmptyStreamingSource() {
final String path = tempFile.getAbsolutePath();
conf = TestConfigurations.getDefaultConf(path);