This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch release-0.10.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6c79a68ad91bac8f3bb0e4a97915e6ede90cfe6a Author: Danny Chan <[email protected]> AuthorDate: Fri Dec 3 08:59:10 2021 +0800 [HUDI-2914] Fix remote timeline server config for flink (#4191) (cherry picked from commit 934fe54cc57b508875383cd807735b1323fef754) --- .../sink/partitioner/BucketAssignFunction.java | 6 ++--- .../hudi/sink/partitioner/BucketAssigner.java | 6 +---- .../partitioner/profile/DeltaWriteProfile.java | 12 ++++----- .../sink/partitioner/profile/WriteProfile.java | 29 ++++++---------------- .../java/org/apache/hudi/util/StreamerUtil.java | 8 ++++-- .../hudi/sink/partitioner/TestBucketAssigner.java | 2 +- .../org/apache/hudi/utils/TestStreamerUtil.java | 11 ++++++++ 7 files changed, 34 insertions(+), 40 deletions(-) diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 33f1dd6..73fd668 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -121,16 +121,16 @@ public class BucketAssignFunction<K, I, O extends HoodieRecord<?>> getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getMaxNumberOfParallelSubtasks(), getRuntimeContext().getNumberOfParallelSubtasks(), - ignoreSmallFiles(writeConfig), + ignoreSmallFiles(), HoodieTableType.valueOf(conf.getString(FlinkOptions.TABLE_TYPE)), context, writeConfig); this.payloadCreation = PayloadCreation.instance(this.conf); } - private boolean ignoreSmallFiles(HoodieWriteConfig writeConfig) { + private boolean ignoreSmallFiles() { WriteOperationType operationType = WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION)); - return WriteOperationType.isOverwrite(operationType) || writeConfig.allowDuplicateInserts(); + return WriteOperationType.isOverwrite(operationType); } @Override diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java index f9d5b1c..e73890f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssigner.java @@ -22,7 +22,6 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.profile.WriteProfile; import org.apache.hudi.sink.partitioner.profile.WriteProfiles; -import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.commit.SmallFile; @@ -188,6 +187,7 @@ public class BucketAssigner implements AutoCloseable { smallFileAssignMap.put(partitionPath, assign); return assign; } + smallFileAssignMap.put(partitionPath, null); return null; } @@ -211,10 +211,6 @@ public class BucketAssigner implements AutoCloseable { this.writeProfile.reload(checkpointId); } - public HoodieTable<?, ?, ?, ?> getTable() { - return this.writeProfile.getTable(); - } - private boolean fileIdOfThisTask(String fileId) { // the file id can shuffle to this task return KeyGroupRangeAssignment.assignKeyToParallelOperator(fileId, maxParallelism, numTasks) == taskID; diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index 922c056..1f08c5a 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.AbstractTableFileSystemView; +import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.commit.SmallFile; @@ -55,13 +55,11 @@ public class DeltaWriteProfile extends WriteProfile { // Find out all eligible small file slices if (!commitTimeline.empty()) { HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - // initialize the filesystem view based on the commit metadata - initFileSystemView(); - // find smallest file in partition and append to it + // find the smallest file in partition and append to it List<FileSlice> allSmallFileSlices = new ArrayList<>(); // If we can index log files, we can add more inserts to log files for fileIds including those under // pending compaction. - List<FileSlice> allFileSlices = fsView.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) + List<FileSlice> allFileSlices = getFileSystemView().getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp(), true) .collect(Collectors.toList()); for (FileSlice fileSlice : allFileSlices) { if (isSmallFile(fileSlice)) { @@ -91,8 +89,8 @@ public class DeltaWriteProfile extends WriteProfile { return smallFileLocations; } - protected AbstractTableFileSystemView getFileSystemView() { - return (AbstractTableFileSystemView) this.table.getSliceView(); + protected SyncableFileSystemView getFileSystemView() { + return (SyncableFileSystemView) this.table.getSliceView(); } private long getTotalFileSize(FileSlice fileSlice) { diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 1171a54..98eb29c 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -23,9 +23,10 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.AbstractTableFileSystemView; +import org.apache.hudi.common.table.view.SyncableFileSystemView; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.sink.partitioner.BucketAssigner; import org.apache.hudi.table.HoodieFlinkTable; @@ -92,11 +93,6 @@ public class WriteProfile { private long reloadedCheckpointId; /** - * The file system view cache for one checkpoint interval. - */ - protected AbstractTableFileSystemView fsView; - - /** * Metadata cache to reduce IO of metadata files. */ private final Map<String, HoodieCommitMetadata> metadataCache; @@ -120,8 +116,8 @@ public class WriteProfile { return recordsPerBucket; } - public HoodieTable<?, ?, ?, ?> getTable() { - return table; + public HoodieTableMetaClient getMetaClient() { + return this.table.getMetaClient(); } /** @@ -183,9 +179,7 @@ public class WriteProfile { if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); - // initialize the filesystem view based on the commit metadata - initFileSystemView(); - List<HoodieBaseFile> allFiles = fsView + List<HoodieBaseFile> allFiles = getFileSystemView() .getLatestBaseFilesBeforeOrOn(partitionPath, latestCommitTime.getTimestamp()).collect(Collectors.toList()); for (HoodieBaseFile file : allFiles) { @@ -203,15 +197,8 @@ public class WriteProfile { return smallFileLocations; } - @VisibleForTesting - public void initFileSystemView() { - if (fsView == null) { - fsView = getFileSystemView(); - } - } - - protected AbstractTableFileSystemView getFileSystemView() { - return (AbstractTableFileSystemView) this.table.getBaseFileOnlyView(); + protected SyncableFileSystemView getFileSystemView() { + return (SyncableFileSystemView) HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) table.getContext()).getBaseFileOnlyView(); } /** @@ -245,9 +232,7 @@ public class WriteProfile { return; } this.table.getMetaClient().reloadActiveTimeline(); - this.table.getHoodieView().sync(); recordProfile(); - this.fsView = null; cleanMetadataCache(this.table.getMetaClient().getCommitsTimeline().filterCompletedInstants().getInstants()); this.smallFilesMap.clear(); this.reloadedCheckpointId = checkpointId; diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 28a70b9..1e39c92 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -365,6 +365,7 @@ public class StreamerUtil { * * <p>This expects to be used by client, the driver should start an embedded timeline server. */ + @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( @@ -382,17 +383,20 @@ public class StreamerUtil { * * <p>The task context supplier is a constant: the write token is always '0-1-0'. */ + @SuppressWarnings("rawtypes") public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); + // build the write client to start the embedded timeline server + final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); // create the filesystem view storage properties for client - FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); + final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); // rebuild the view storage config with simplified options. FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder() .withStorageType(viewStorageConfig.getStorageType()) .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()).build(); ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt); - return new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); + return writeClient; } /** diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 053c2a3..4f4b549 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -401,7 +401,7 @@ public class TestBucketAssigner { } private static String getLastCompleteInstant(WriteProfile profile) { - return StreamerUtil.getLastCompletedInstant(profile.getTable().getMetaClient()); + return StreamerUtil.getLastCompletedInstant(profile.getMetaClient()); } private void assertBucketEquals( diff --git a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index e00fbfa..57297c5 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -19,9 +19,12 @@ package org.apache.hudi.utils; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.ViewStorageProperties; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; @@ -98,5 +101,13 @@ public class TestStreamerUtil { long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower); assertThat(diff, is(75L)); } + + @Test + void testDumpRemoteViewStorageConfig() throws IOException { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + StreamerUtil.createWriteClient(conf); + FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH)); + assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST)); + } }
