morningman commented on code in PR #60063:
URL: https://github.com/apache/doris/pull/60063#discussion_r2715237928


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java:
##########
@@ -234,6 +237,11 @@ private ConnectContext buildConnectContext() {
         // Clone session variables from parent
         
taskContext.setSessionVariable(VariableMgr.cloneSessionVariable(connectContext.getSessionVariable()));
 
+        // Calculate optimal parallelism and determine distribution strategy
+        RewriteStrategy strategy = calculateRewriteStrategy();
+        // Pipeline engine uses parallelPipelineTaskNum to control instance 
parallelism.
+        taskContext.getSessionVariable().parallelPipelineTaskNum = 
strategy.parallelism;

Review Comment:
   Here we use a new value to overwrite the session variable?



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteDataFileExecutor.java:
##########
@@ -69,6 +69,10 @@ public RewriteResult 
executeGroupsConcurrently(List<RewriteDataGroup> groups, lo
         List<RewriteGroupTask> tasks = Lists.newArrayList();
         RewriteResultCollector resultCollector = new 
RewriteResultCollector(groups.size(), tasks);
 
+        // Get available BE count once before creating tasks
+        // This avoids calling getBackendsNumber() in each task during 
multi-threaded execution
+        int availableBeCount = 
Env.getCurrentSystemInfo().getBackendsNumber(true);

Review Comment:
   We need to consider the compute group



##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java:
##########
@@ -252,9 +260,78 @@ private ConnectContext buildConnectContext() {
         statementContext.setConnectContext(taskContext);
         taskContext.setStatementContext(statementContext);
 
+        // Set GATHER distribution flag if needed (for small data rewrite)
+        statementContext.setUseGatherForIcebergRewrite(strategy.useGather);
+
         return taskContext;
     }
 
+    /**
+     * Calculate optimal rewrite strategy including parallelism and 
distribution mode.
+     *
+     * The core idea is to precisely control the number of output files:
+     * 1. Calculate expected file count based on data size and target file size
+     * 2. If expected file count is small (e.g., <= threshold), use GATHER to 
collect
+     *    data to a single node, ensuring minimal file output
+     * 3. Otherwise, limit parallelism to avoid too many writers
+     *
+     * @return RewriteStrategy containing parallelism and distribution settings
+     */
+    private RewriteStrategy calculateRewriteStrategy() {
+        // 1. Calculate expected output file count based on data size
+        long totalSize = group.getTotalSize();
+        int expectedFileCount = (int) Math.ceil((double) totalSize / 
targetFileSizeBytes);
+
+        // 2. Use available BE count passed from constructor
+        int availableBeCount = this.availableBeCount;
+
+        // 3. Get default parallelism from session variable
+        int defaultParallelism = 
connectContext.getSessionVariable().getParallelExecInstanceNum();
+
+        // 4. Determine strategy based on expected file count
+        boolean useGather = false;
+        int optimalParallelism;
+
+        // Threshold for using GATHER distribution
+        // When expected files <= this threshold, collect all data to single 
node
+        final int gatherThreshold = 1;
+
+        if (expectedFileCount <= gatherThreshold) {
+            // Small data volume: use GATHER to write to single node
+            useGather = true;
+            optimalParallelism = 1;
+            LOG.info("[Rewrite Task] taskId: {}, using GATHER distribution for 
small data. "

Review Comment:
   This log can be removed. Use one in line 313



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to