This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c54735cc87 [HUDI-4914] Managed memory weight should be set when sort 
clustering is enabled (#6792)
c54735cc87 is described below

commit c54735cc8717b8b22e08e61a7360695da2e38d3a
Author: Nicholas Jiang <[email protected]>
AuthorDate: Mon Sep 26 20:22:50 2022 +0800

    [HUDI-4914] Managed memory weight should be set when sort clustering is 
enabled (#6792)
---
 .../hudi/sink/clustering/ClusteringOperator.java   | 39 +++++++++-------------
 .../java/org/apache/hudi/sink/utils/Pipelines.java | 10 ++++--
 .../apache/hudi/sink/ITTestDataStreamWrite.java    | 12 +++++++
 3 files changed, 34 insertions(+), 27 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
index 9b38f0ceea..e30a3577f0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java
@@ -105,9 +105,6 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
   private transient int[] requiredPos;
   private transient AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
   private transient HoodieFlinkWriteClient writeClient;
-  private transient BulkInsertWriterHelper writerHelper;
-
-  private transient BinaryExternalSorter sorter;
   private transient StreamRecordCollector<ClusteringCommitEvent> collector;
   private transient BinaryRowDataSerializer binarySerializer;
 
@@ -153,10 +150,6 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(rowType);
     this.binarySerializer = new 
BinaryRowDataSerializer(rowType.getFieldCount());
 
-    if (this.sortClusteringEnabled) {
-      initSorter();
-    }
-
     if (this.asyncClustering) {
       this.executor = NonThrownExecutor.builder(LOG).build();
     }
@@ -186,6 +179,7 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     if (this.writeClient != null) {
       this.writeClient.cleanHandlesGracefully();
       this.writeClient.close();
+      this.writeClient = null;
     }
   }
 
@@ -203,7 +197,9 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
   private void doClustering(String instantTime, ClusteringPlanEvent event) 
throws Exception {
     final ClusteringGroupInfo clusteringGroupInfo = 
event.getClusteringGroupInfo();
 
-    initWriterHelper(instantTime);
+    BulkInsertWriterHelper writerHelper = new 
BulkInsertWriterHelper(this.conf, this.table, this.writeConfig,
+        instantTime, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
+        this.rowType);
 
     List<ClusteringOperation> clusteringOps = 
clusteringGroupInfo.getOperations();
     boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
@@ -220,33 +216,27 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
     RowDataSerializer rowDataSerializer = new RowDataSerializer(rowType);
 
     if (this.sortClusteringEnabled) {
+      BinaryExternalSorter sorter = initSorter();
       while (iterator.hasNext()) {
         RowData rowData = iterator.next();
         BinaryRowData binaryRowData = 
rowDataSerializer.toBinaryRow(rowData).copy();
-        this.sorter.write(binaryRowData);
+        sorter.write(binaryRowData);
       }
 
       BinaryRowData row = binarySerializer.createInstance();
       while ((row = sorter.getIterator().next(row)) != null) {
-        this.writerHelper.write(row);
+        writerHelper.write(row);
       }
+      sorter.close();
     } else {
       while (iterator.hasNext()) {
-        this.writerHelper.write(iterator.next());
+        writerHelper.write(iterator.next());
       }
     }
 
-    List<WriteStatus> writeStatuses = 
this.writerHelper.getWriteStatuses(this.taskID);
+    List<WriteStatus> writeStatuses = 
writerHelper.getWriteStatuses(this.taskID);
     collector.collect(new ClusteringCommitEvent(instantTime, writeStatuses, 
this.taskID));
-    this.writerHelper = null;
-  }
-
-  private void initWriterHelper(String clusteringInstantTime) {
-    if (this.writerHelper == null) {
-      this.writerHelper = new BulkInsertWriterHelper(this.conf, this.table, 
this.writeConfig,
-          clusteringInstantTime, this.taskID, 
getRuntimeContext().getNumberOfParallelSubtasks(), 
getRuntimeContext().getAttemptNumber(),
-          this.rowType);
-    }
+    writerHelper.close();
   }
 
   /**
@@ -338,13 +328,13 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
         .toArray();
   }
 
-  private void initSorter() {
+  private BinaryExternalSorter initSorter() {
     ClassLoader cl = getContainingTask().getUserCodeClassLoader();
     NormalizedKeyComputer computer = 
createSortCodeGenerator().generateNormalizedKeyComputer("SortComputer").newInstance(cl);
     RecordComparator comparator = 
createSortCodeGenerator().generateRecordComparator("SortComparator").newInstance(cl);
 
     MemoryManager memManager = 
getContainingTask().getEnvironment().getMemoryManager();
-    this.sorter =
+    BinaryExternalSorter sorter =
         new BinaryExternalSorter(
             this.getContainingTask(),
             memManager,
@@ -355,12 +345,13 @@ public class ClusteringOperator extends 
TableStreamOperator<ClusteringCommitEven
             computer,
             comparator,
             getContainingTask().getJobConfiguration());
-    this.sorter.startThreads();
+    sorter.startThreads();
 
     // register the metrics.
     getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) 
sorter::getUsedMemoryInBytes);
     getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) 
sorter::getNumSpillFiles);
     getMetricGroup().gauge("spillInBytes", (Gauge<Long>) 
sorter::getSpillInBytes);
+    return sorter;
   }
 
   private SortCodeGenerator createSortCodeGenerator() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 1e4f82a957..82761adf73 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -401,7 +401,7 @@ public class Pipelines {
    * @return the clustering pipeline
    */
   public static DataStreamSink<ClusteringCommitEvent> cluster(Configuration 
conf, RowType rowType, DataStream<Object> dataStream) {
-    return dataStream.transform("cluster_plan_generate",
+    DataStream<ClusteringCommitEvent> clusteringStream = 
dataStream.transform("cluster_plan_generate",
             TypeInformation.of(ClusteringPlanEvent.class),
             new ClusteringPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
@@ -413,8 +413,12 @@ public class Pipelines {
         .transform("clustering_task",
             TypeInformation.of(ClusteringCommitEvent.class),
             new ClusteringOperator(conf, rowType))
-        .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS))
-        .addSink(new ClusteringCommitSink(conf))
+        .setParallelism(conf.getInteger(FlinkOptions.CLUSTERING_TASKS));
+    if (OptionsResolver.sortClusteringEnabled(conf)) {
+      ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
+          conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
+    }
+    return clusteringStream.addSink(new ClusteringCommitSink(conf))
         .name("clustering_commit")
         .setParallelism(1); // compaction commit should be singleton
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
index 6ee23727d0..e6d2ddb7b5 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java
@@ -158,10 +158,22 @@ public class ITTestDataStreamWrite extends TestLogger {
 
   @Test
   public void testWriteCopyOnWriteWithClustering() throws Exception {
+    testWriteCopyOnWriteWithClustering(false);
+  }
+
+  @Test
+  public void testWriteCopyOnWriteWithSortClustering() throws Exception {
+    testWriteCopyOnWriteWithClustering(true);
+  }
+
+  private void testWriteCopyOnWriteWithClustering(boolean 
sortClusteringEnabled) throws Exception {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
     conf.setBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true);
     conf.setInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS, 1);
     conf.setString(FlinkOptions.OPERATION, "insert");
+    if (sortClusteringEnabled) {
+      conf.setString(FlinkOptions.CLUSTERING_SORT_COLUMNS, "uuid");
+    }
 
     testWriteToHoodieWithCluster(conf, "cow_write_with_cluster", 1, EXPECTED);
   }

Reply via email to