This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch release-0.10.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2a4e7186883eb97eca7a5072844f47c77fbf1ef4 Author: Danny Chan <[email protected]> AuthorDate: Fri Dec 3 16:12:59 2021 +0800 [HUDI-2924] Refresh the fs view on successful checkpoints for write profile (#4199) (cherry picked from commit 0699521f83eab843279ff7d78ac2b92fc80ad79b) --- .../src/main/java/org/apache/hudi/sink/CleanFunction.java | 4 +++- .../apache/hudi/sink/partitioner/BucketAssignFunction.java | 2 +- .../hudi/sink/partitioner/profile/DeltaWriteProfile.java | 2 +- .../apache/hudi/sink/partitioner/profile/WriteProfile.java | 9 ++++++++- .../src/main/java/org/apache/hudi/util/StreamerUtil.java | 13 ++++++++++++- 5 files changed, 25 insertions(+), 5 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index 26ac9f3..13154b2 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -60,7 +60,9 @@ public class CleanFunction<T> extends AbstractRichFunction public void open(Configuration parameters) throws Exception { super.open(parameters); if (conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) { - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + // do not use the remote filesystem view because the async cleaning service + // local timeline is very probably to fall behind with the remote one. + this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext(), false); this.executor = NonThrownExecutor.builder(LOG).build(); } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 73fd668..14cad16 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -113,7 +113,7 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>> @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); + HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, false); HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(StreamerUtil.getHadoopConf()), new FlinkTaskContextSupplier(getRuntimeContext())); diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index e1c890c..97b6b23 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -59,7 +59,7 @@ public class DeltaWriteProfile extends WriteProfile { List<FileSlice> allSmallFileSlices = new ArrayList<>(); // If we can index log files, we can add more inserts to log files for fileIds including those under // pending compaction. - List<FileSlice> allFileSlices = getFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) + List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { if (isSmallFile(fileSlice)) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index d13162f..f6840e5 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -94,6 +94,11 @@ public class WriteProfile { private long reloadedCheckpointId; /** + * The file system view cache for one checkpoint interval. + */ + protected SyncableFileSystemView fsView; + + /** * Metadata cache to reduce IO of metadata files. */ private final Map<String, HoodieCommitMetadata> metadataCache; @@ -111,6 +116,7 @@ public class WriteProfile { this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize(); this.metaClient = StreamerUtil.createMetaClient(config.getBasePath(), context.getHadoopConf().get()); this.metadataCache = new HashMap<>(); + this.fsView = getFileSystemView(); // profile the record statistics on construction recordProfile(); } @@ -190,7 +196,7 @@ public class WriteProfile { if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - List<HoodieBaseFile> allFiles = getFileSystemView() + List<HoodieBaseFile> allFiles = fsView .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); for (HoodieBaseFile file : allFiles) { @@ -243,6 +249,7 @@ public class WriteProfile { return; } this.metaClient.reloadActiveTimeline(); + this.fsView = getFileSystemView(); recordProfile(); cleanMetadataCache(this.metaClient.getCommitsTimeline().filterCompletedInstants().getInstants()); this.smallFilesMap.clear(); diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 1e39c92..c417d7d 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -367,12 +367,23 @@ public class StreamerUtil { */ @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { + return createWriteClient(conf, runtimeContext, true); + } + + /** + * Creates the Flink write client. + * + * <p>This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use + * remote filesystem view storage config, or an in-memory filesystem view storage is used. + */ + @SuppressWarnings("rawtypes") + public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) { HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(getHadoopConf()), new FlinkTaskContextSupplier(runtimeContext)); - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true); + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig); return new HoodieFlinkWriteClient<>(context, writeConfig); }
