wzhero1 commented on code in PR #7027:
URL: https://github.com/apache/paimon/pull/7027#discussion_r2973182531


##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java:
##########
@@ -50,12 +93,135 @@ public ExpireSnapshotsAction(
         this.olderThan = olderThan;
         this.maxDeletes = maxDeletes;
         this.options = options;
+        this.parallelism = resolveParallelism(parallelism);
+    }
+
+    private int resolveParallelism(Integer parallelism) {
+        if (parallelism != null) {
+            return parallelism;
+        }
+        int envParallelism = env.getParallelism();
+        return envParallelism > 0 ? envParallelism : 1;
     }
 
+    @Override
+    public void run() throws Exception {
+        if (forceStartFlinkJob) {
+            // Parallel mode: build custom multi-parallelism Flink pipeline
+            build();
+            execute(this.getClass().getSimpleName());
+        } else {
+            // Default: ActionBase handles LocalAction → executeLocally()
+            super.run();
+        }
+    }
+
+    @Override
     public void executeLocally() throws Exception {
         ExpireSnapshotsProcedure expireSnapshotsProcedure = new 
ExpireSnapshotsProcedure();
         expireSnapshotsProcedure.withCatalog(catalog);
         expireSnapshotsProcedure.call(
                 null, database + "." + table, retainMax, retainMin, olderThan, 
maxDeletes, options);
     }
+
+    @Override
+    public void build() throws Exception {
+        Identifier identifier = new Identifier(database, table);
+
+        // Prepare table with dynamic options
+        HashMap<String, String> dynamicOptions = new HashMap<>();
+        ProcedureUtils.putAllOptions(dynamicOptions, options);
+        FileStoreTable fileStoreTable =
+                (FileStoreTable) 
catalog.getTable(identifier).copy(dynamicOptions);
+
+        // Build expire config
+        CoreOptions tableOptions = fileStoreTable.store().options();
+        ExpireConfig expireConfig =
+                ProcedureUtils.fillInSnapshotOptions(
+                                tableOptions, retainMax, retainMin, olderThan, 
maxDeletes)
+                        .build();
+
+        // Create planner using factory method
+        ExpireSnapshotsPlanner planner = 
ExpireSnapshotsPlanner.create(fileStoreTable);
+
+        // Plan the expiration
+        ExpireSnapshotsPlan plan = planner.plan(expireConfig);
+        if (plan.isEmpty()) {
+            LOG.info("No snapshots to expire");
+            return;

Review Comment:
   Prefer to keep the early return here. Submitting an empty Flink job still 
involves cluster resource allocation and scheduling overhead for zero useful 
work. I've improved the log message to make the skip behavior clear to users.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to