zhangstar333 commented on code in PR #60063:
URL: https://github.com/apache/doris/pull/60063#discussion_r2752337350


##########
fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java:
##########
@@ -252,9 +260,86 @@ private ConnectContext buildConnectContext() {
         statementContext.setConnectContext(taskContext);
         taskContext.setStatementContext(statementContext);
 
+        // Set GATHER distribution flag if needed (for small data rewrite)
+        statementContext.setUseGatherForIcebergRewrite(strategy.useGather);
+
         return taskContext;
     }
 
+    /**
+     * Calculate optimal rewrite strategy including parallelism and 
distribution mode.
+     *
+     * The core idea is to precisely control the number of output files:
+     * 1. Calculate expected file count based on data size and target file size
+     * 2. If expected file count is not greater than available BE count, use 
GATHER
+     *    to collect data to a single node, avoiding excessive writers
+     * 3. Otherwise, limit per-BE parallelism so total writers <= expected 
files
+     *
+     * @return RewriteStrategy containing parallelism and distribution settings
+     */
+    private RewriteStrategy calculateRewriteStrategy() {
+        // 1. Calculate expected output file count based on data size
+        long totalSize = group.getTotalSize();
+        int expectedFileCount = (int) Math.ceil((double) totalSize / 
targetFileSizeBytes);
+
+        // 2. Use available BE count passed from constructor
+        int availableBeCount = this.availableBeCount;
+        int safeBeCount = Math.max(1, availableBeCount);
+
+        // 3. Get default parallelism from session variable (pipeline task num)
+        int defaultParallelism = getDefaultPipelineParallelism();
+
+        // 4. Determine strategy based on expected file count
+        boolean useGather = false;
+        int optimalParallelism;
+
+        // Threshold for using GATHER distribution
+        // When expected files <= available BEs, collect all data to single 
node
+        final int gatherThreshold = safeBeCount;
+
+        if (expectedFileCount <= gatherThreshold) {
+            // Small data volume: use GATHER to write to single node
+            // Keep parallelism <= expected files to avoid extra output files
+            useGather = true;
+            optimalParallelism = Math.max(1, Math.min(defaultParallelism, 
expectedFileCount));
+        } else {
+            // Larger data volume: limit per-BE parallelism so total writers 
<= expected files
+            int maxParallelismByFileCount = Math.max(1, expectedFileCount / 
safeBeCount);
+            optimalParallelism = Math.max(1, Math.min(defaultParallelism, 
maxParallelismByFileCount));
+        }
+
+        LOG.info("[Rewrite Task] taskId: {}, totalSize: {} bytes, 
targetFileSize: {} bytes, "
+                        + "expectedFileCount: {}, availableBeCount: {}, 
defaultParallelism: {}, "
+                        + "optimalParallelism: {}, useGather: {}",
+                taskId, totalSize, targetFileSizeBytes, expectedFileCount,
+                availableBeCount, defaultParallelism, optimalParallelism, 
useGather);
+
+        return new RewriteStrategy(optimalParallelism, useGather);
+    }
+
+    private int getDefaultPipelineParallelism() {

Review Comment:
   seems could call function directly 
SessionVariable#getParallelExecInstanceNum()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to