This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c54735cc87 [HUDI-4914] Managed memory weight should be set when sort
clustering is enabled (#6792)
c54735cc87 is described below
commit c54735cc8717b8b22e08e61a7360695da2e38d3a
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Sep 26 20:22:50 2022 +0800
[HUDI-4914] Managed memory weight should be set when sort clustering is
enabled (#6792)
---
.../hudi/sink/clustering/ClusteringOperator.java | 39 +++++++++-------------
.../java/org/apache/hudi/sink/utils/Pipelines.java | 10 ++++--
.../apache/hudi/sink/ITTestDataStreamWrite.java | 12 +++++++
3 files changed, 34 insertions(+), 27 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 9b38f0ceea..e30a3577f0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -105,9 +105,6 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
private transient int[] requiredPos;
private transient AvroToRowDataConverters.AvroToRowDataConverter
avroToRowDataConverter;
private transient HoodieFlinkWriteClient writeClient;
- private transient BulkInsertWriterHelper writerHelper;
-
- private transient BinaryExternalSorter sorter;
private transient StreamRecordCollector<ClusteringCommitEvent> collector;
private transient BinaryRowDataSerializer binarySerializer;
@@ -153,10 +150,6 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(rowType);
this.binarySerializer = new
BinaryRowDataSerializer(rowType.getFieldCount());
- if (this.sortClusteringEnabled) {
- initSorter();
- }
-
if (this.asyncClustering) {
this.executor = NonThrownExecutor.builder(LOG).build();
}
@@ -186,6 +179,7 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
if (this.writeClient != null) {
this.writeClient.cleanHandlesGracefully();
this.writeClient.close();
+ this.writeClient = null;
}
}
@@ -203,7 +197,9 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
private void doClustering(String instantTime, ClusteringPlanEvent event)
throws Exception {
final ClusteringGroupInfo clusteringGroupInfo =
event.getClusteringGroupInfo();
- initWriterHelper(instantTime);
+ BulkInsertWriterHelper writerHelper = new
BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
+ instantTime, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getAttemptNumber(),
+ this.rowType);
List<ClusteringOperation> clusteringOps =
clusteringGroupInfo.getOperations();
boolean hasLogFiles = clusteringOps.stream().anyMatch(op ->
op.getDeltaFilePaths().size() > 0);
@@ -220,33 +216,27 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
if (this.sortClusteringEnabled) {
+ BinaryExternalSorter sorter = initSorter();
while (iterator.hasNext()) {
RowData rowData = iterator.next();
BinaryRowData binaryRowData =
rowDataSerializer.toBinaryRow(rowData).copy();
- this.sorter.write(binaryRowData);
+ sorter.write(binaryRowData);
}
BinaryRowData row = binarySerializer.createInstance();
while ((row = sorter.getIterator().next(row)) != null) {
- this.writerHelper.write(row);
+ writerHelper.write(row);
}
+ sorter.close();
} else {
while (iterator.hasNext()) {
- this.writerHelper.write(iterator.next());
+ writerHelper.write(iterator.next());
}
}
- List<WriteStatus> writeStatuses =
this.writerHelper.getWriteStatuses(this.taskID);
+ List<WriteStatus> writeStatuses =
writerHelper.getWriteStatuses(this.taskID);
collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses,
this.taskID));
- this.writerHelper = null;
- }
-
- private void initWriterHelper(String clusteringInstantTime) {
- if (this.writerHelper == null) {
- this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table,
this.writeConfig,
- clusteringInstantTime, this.taskID,
getRuntimeContext().getNumberOfParallelSubtasks(),
getRuntimeContext().getAttemptNumber(),
- this.rowType);
- }
+ writerHelper.close();
}
/**
@@ -338,13 +328,13 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
.toArray();
}
- private void initSorter() {
+ private BinaryExternalSorter initSorter() {
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
NormalizedKeyComputer computer =
createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
RecordComparator comparator =
createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
MemoryManager memManager =
getContainingTask().getEnvironment().getMemoryManager();
- this.sorter =
+ BinaryExternalSorter sorter =
new BinaryExternalSorter(
this.getContainingTask(),
memManager,
@@ -355,12 +345,13 @@ public class ClusteringOperator extends
TableStreamOperator<ClusteringCommitEven
computer,
comparator,
getContainingTask().getJobConfiguration());
- this.sorter.startThreads();
+ sorter.startThreads();
// register the metrics.
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>)
sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>)
sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>)
sorter::getSpillInBytes);
+ return sorter;
}
private SortCodeGenerator createSortCodeGenerator() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 1e4f82a957..82761adf73 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -401,7 +401,7 @@ public class Pipelines {
* @return the clustering pipeline
*/
public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration
conf, RowType rowType, DataStream<Object> dataStream) {
- return dataStream.transform("cluster_plan_generate",
+ DataStream<ClusteringCommitEvent> clusteringStream =
dataStream.transform("cluster_plan_generate",
TypeInformation.of(ClusteringPlanEvent.class),
new ClusteringPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
@@ -413,8 +413,12 @@ public class Pipelines {
.transform("clustering_task",
TypeInformation.of(ClusteringCommitEvent.class),
new ClusteringOperator(conf, rowType))
- .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS))
- .addSink(new ClusteringCommitSink(conf))
+ .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
+ if (OptionsResolver.sortClusteringEnabled(conf)) {
+ ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
+ conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+ }
+ return clusteringStream.addSink(new ClusteringCommitSink(conf))
.name("clustering_commit")
.setParallelism(1); // compaction commit should be singleton
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 6ee23727d0..e6d2ddb7b5 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -158,10 +158,22 @@ public class ITTestDataStreamWrite extends TestLogger {
@Test
public void testWriteCopyOnWriteWithClustering() throws Exception {
+ testWriteCopyOnWriteWithClustering(false);
+ }
+
+ @Test
+ public void testWriteCopyOnWriteWithSortClustering() throws Exception {
+ testWriteCopyOnWriteWithClustering(true);
+ }
+
+ private void testWriteCopyOnWriteWithClustering(boolean
sortClusteringEnabled) throws Exception {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
conf.setString(FlinkOptions.OPERATION, "insert");
+ if (sortClusteringEnabled) {
+ conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, "uuid");
+ }
testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
}