This is an automated email from the ASF dual-hosted git repository. yuzhaojing pushed a commit to branch release-0.12.1-rc1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 053863081140309f8db4b92b524d3f8ef0de5bc8 Author: Nicholas Jiang <[email protected]> AuthorDate: Mon Sep 26 20:22:50 2022 +0800 [HUDI-4914] Managed memory weight should be set when sort clustering is enabled (#6792) --- .../hudi/sink/clustering/ClusteringOperator.java | 39 +++++++++------------- .../java/org/apache/hudi/sink/utils/Pipelines.java | 10 ++++-- .../apache/hudi/sink/ITTestDataStreamWrite.java | 12 +++++++ 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 9b38f0ceea..e30a3577f0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -105,9 +105,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven private transient int[] requiredPos; private transient AvroToRowDataConverters.AvroToRowDataConverter avroToRowDataConverter; private transient HoodieFlinkWriteClient writeClient; - private transient BulkInsertWriterHelper writerHelper; - - private transient BinaryExternalSorter sorter; private transient StreamRecordCollector<ClusteringCommitEvent> collector; private transient BinaryRowDataSerializer binarySerializer; @@ -153,10 +150,6 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven this.avroToRowDataConverter = AvroToRowDataConverters.createRowConverter(rowType); this.binarySerializer = new BinaryRowDataSerializer(rowType.getFieldCount()); - if (this.sortClusteringEnabled) { - initSorter(); - } - if (this.asyncClustering) { this.executor = NonThrownExecutor.builder(LOG).build(); } @@ -186,6 +179,7 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven if (this.writeClient != null) { this.writeClient.cleanHandlesGracefully(); this.writeClient.close(); + this.writeClient = null; } } @@ -203,7 +197,9 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven private void doClustering(String instantTime, ClusteringPlanEvent event) throws Exception { final ClusteringGroupInfo clusteringGroupInfo = event.getClusteringGroupInfo(); - initWriterHelper(instantTime); + BulkInsertWriterHelper writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig, + instantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), + this.rowType); List<ClusteringOperation> clusteringOps = clusteringGroupInfo.getOperations(); boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> op.getDeltaFilePaths().size() > 0); @@ -220,33 +216,27 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType); if (this.sortClusteringEnabled) { + BinaryExternalSorter sorter = initSorter(); while (iterator.hasNext()) { RowData rowData = iterator.next(); BinaryRowData binaryRowData = rowDataSerializer.toBinaryRow(rowData).copy(); - this.sorter.write(binaryRowData); + sorter.write(binaryRowData); } BinaryRowData row = binarySerializer.createInstance(); while ((row = sorter.getIterator().next(row)) != null) { - this.writerHelper.write(row); + writerHelper.write(row); } + sorter.close(); } else { while (iterator.hasNext()) { - this.writerHelper.write(iterator.next()); + writerHelper.write(iterator.next()); } } - List<WriteStatus> writeStatuses = this.writerHelper.getWriteStatuses(this.taskID); + List<WriteStatus> writeStatuses = writerHelper.getWriteStatuses(this.taskID); collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, this.taskID)); - this.writerHelper = null; - } - - private void initWriterHelper(String clusteringInstantTime) { - if (this.writerHelper == null) { - this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, this.writeConfig, - clusteringInstantTime, this.taskID, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getAttemptNumber(), - this.rowType); - } + writerHelper.close(); } /** @@ -338,13 +328,13 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven .toArray(); } - private void initSorter() { + private BinaryExternalSorter initSorter() { ClassLoader cl = getContainingTask().getUserCodeClassLoader(); NormalizedKeyComputer computer = createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl); RecordComparator comparator = createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl); MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager(); - this.sorter = + BinaryExternalSorter sorter = new BinaryExternalSorter( this.getContainingTask(), memManager, @@ -355,12 +345,13 @@ public class ClusteringOperator extends TableStreamOperator<ClusteringCommitEven computer, comparator, getContainingTask().getJobConfiguration()); - this.sorter.startThreads(); + sorter.startThreads(); // register the metrics. getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes); getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles); getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes); + return sorter; } private SortCodeGenerator createSortCodeGenerator() { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 1e4f82a957..82761adf73 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -401,7 +401,7 @@ public class Pipelines { * @return the clustering pipeline */ public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration conf, RowType rowType, DataStream<Object> dataStream) { - return dataStream.transform("cluster_plan_generate", + DataStream<ClusteringCommitEvent> clusteringStream = dataStream.transform("cluster_plan_generate", TypeInformation.of(ClusteringPlanEvent.class), new ClusteringPlanOperator(conf)) .setParallelism(1) // plan generate must be singleton @@ -413,8 +413,12 @@ public class Pipelines { .transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), new ClusteringOperator(conf, rowType)) - .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS)) - .addSink(new ClusteringCommitSink(conf)) + .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS)); + if (OptionsResolver.sortClusteringEnabled(conf)) { + ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(), + conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); + } + return clusteringStream.addSink(new ClusteringCommitSink(conf)) .name("clustering_commit") .setParallelism(1); // compaction commit should be singleton } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 6ee23727d0..e6d2ddb7b5 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -158,10 +158,22 @@ public class ITTestDataStreamWrite extends TestLogger { @Test public void testWriteCopyOnWriteWithClustering() throws Exception { + testWriteCopyOnWriteWithClustering(false); + } + + @Test + public void testWriteCopyOnWriteWithSortClustering() throws Exception { + testWriteCopyOnWriteWithClustering(true); + } + + private void testWriteCopyOnWriteWithClustering(boolean sortClusteringEnabled) throws Exception { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true); conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1); conf.setString(FlinkOptions.OPERATION, "insert"); + if (sortClusteringEnabled) { + conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, "uuid"); + } testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED); }
