wzhero1 commented on code in PR #7027:
URL: https://github.com/apache/paimon/pull/7027#discussion_r3013704053


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java:
##########
@@ -50,12 +95,139 @@ public ExpireSnapshotsAction(
         this.olderThan = olderThan;
         this.maxDeletes = maxDeletes;
         this.options = options;
+        this.parallelism = resolveParallelism(parallelism);
+    }
+
+    private int resolveParallelism(Integer parallelism) {
+        if (parallelism != null) {
+            return parallelism;
+        }
+        int envParallelism = env.getParallelism();
+        return envParallelism > 0 ? envParallelism : 1;
     }
 
+    @Override
+    public void run() throws Exception {
+        if (forceStartFlinkJob || parallelism > 1) {
+            // Parallel mode: build custom multi-parallelism Flink pipeline
+            build();
+            if (!env.getTransformations().isEmpty()) {
+                execute(this.getClass().getSimpleName());
+            }
+        } else {
+            // Default: ActionBase handles LocalAction → executeLocally()
+            super.run();
+        }
+    }
+
+    @Override
     public void executeLocally() throws Exception {
         ExpireSnapshotsProcedure expireSnapshotsProcedure = new 
ExpireSnapshotsProcedure();
         expireSnapshotsProcedure.withCatalog(catalog);
         expireSnapshotsProcedure.call(
                 null, database + "." + table, retainMax, retainMin, olderThan, 
maxDeletes, options);
     }
+
+    @Override
+    public void build() throws Exception {
+        Identifier identifier = new Identifier(database, table);
+
+        // Prepare table with dynamic options
+        HashMap<String, String> dynamicOptions = new HashMap<>();
+        ProcedureUtils.putAllOptions(dynamicOptions, options);
+        FileStoreTable fileStoreTable =
+                (FileStoreTable) 
catalog.getTable(identifier).copy(dynamicOptions);
+
+        // Build expire config
+        CoreOptions tableOptions = fileStoreTable.store().options();
+        ExpireConfig expireConfig =
+                ProcedureUtils.fillInSnapshotOptions(
+                                tableOptions, retainMax, retainMin, olderThan, 
maxDeletes)
+                        .build();
+
+        // Create planner using factory method
+        ExpireSnapshotsPlanner planner = 
ExpireSnapshotsPlanner.create(fileStoreTable);
+
+        // Plan the expiration
+        ExpireSnapshotsPlan plan = planner.plan(expireConfig);
+        if (plan.isEmpty()) {
+            LOG.info("No snapshots to expire, skipping Flink job submission.");
+            return;
+        }
+
+        LOG.info(
+                "Planning to expire {} snapshots, range=[{}, {})",
+                plan.endExclusiveId() - plan.beginInclusiveId(),
+                plan.beginInclusiveId(),
+                plan.endExclusiveId());
+
+        // Build worker phase
+        DataStream<DeletionReport> reports = buildWorkerPhase(plan, 
identifier, expireConfig);
+
+        // Build sink phase
+        buildSinkPhase(reports, plan, identifier, expireConfig);
+    }
+
+    /**
+     * Build the worker phase of the Flink job.
+     *
+     * <p>Workers process data file and changelog file deletion tasks in 
parallel. Tasks are
+     * partitioned by snapshot range to ensure:
+     *
+     * <ul>
+     *   <li>Same snapshot range tasks are processed by the same worker
+     *   <li>Within each worker, data files are deleted before changelog files
+     *   <li>Cache locality is maximized (adjacent snapshots often share 
manifest files)
+     * </ul>
+     */
+    private DataStream<DeletionReport> buildWorkerPhase(
+            ExpireSnapshotsPlan plan, Identifier identifier, ExpireConfig 
expireConfig) {
+        // Partition by snapshot range: each worker gets a contiguous range of 
snapshots
+        // with dataFileTasks first, then changelogFileTasks
+        List<List<SnapshotExpireTask>> partitionedGroups =
+                plan.partitionTasksBySnapshotRange(parallelism);
+
+        DataStreamSource<List<SnapshotExpireTask>> source =
+                env.fromCollection(partitionedGroups).setParallelism(1);
+
+        return source.rebalance()
+                .flatMap(
+                        new RangePartitionedExpireFunction(
+                                catalogOptions.toMap(),
+                                identifier,
+                                plan.protectionSet().taggedSnapshots(),
+                                plan.snapshotCache(),
+                                expireConfig.isChangelogDecoupled()))
+                // Use JavaTypeInfo to ensure proper Java serialization of 
DeletionReport,
+                // avoiding Kryo's FieldSerializer which cannot handle 
BinaryRow correctly.
+                // This approach is compatible with both Flink 1.x and 2.x.

Review Comment:
   Done. Removed the parallelism > 1 condition. Now only forceStartFlinkJob 
controls whether to use Flink mode. The DAG stays the same regardless of 
parallelism setting. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to