wzhero1 commented on code in PR #7027:
URL: https://github.com/apache/paimon/pull/7027#discussion_r2922764789
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java:
##########
@@ -50,12 +95,168 @@ public ExpireSnapshotsAction(
this.olderThan = olderThan;
this.maxDeletes = maxDeletes;
this.options = options;
+ this.parallelism = parallelism;
}
+ /** Returns true if forceStartFlinkJob is enabled and parallelism is
greater than 1. */
+ private boolean isParallelMode() {
+ return forceStartFlinkJob && parallelism != null && parallelism > 1;
+ }
+
+ @Override
+ public void run() throws Exception {
+ if (parallelism != null && parallelism > 1 && !forceStartFlinkJob) {
+ throw new IllegalArgumentException(
+ "Parallel expire mode requires both --parallelism > 1 and
--force_start_flink_job enabled.");
+ }
+ if (isParallelMode()) {
+ // Parallel mode: build and execute Flink job (multi parallelism)
+ build();
+ execute(this.getClass().getSimpleName());
+ } else if (forceStartFlinkJob) {
+ // Serial mode but forced to run as Flink job (single parallelism)
+ super.run();
+ } else {
+ // Serial mode: execute locally
+ executeLocally();
+ }
+ }
+
+ @Override
public void executeLocally() throws Exception {
ExpireSnapshotsProcedure expireSnapshotsProcedure = new
ExpireSnapshotsProcedure();
expireSnapshotsProcedure.withCatalog(catalog);
expireSnapshotsProcedure.call(
null, database + "." + table, retainMax, retainMin, olderThan,
maxDeletes, options);
}
+
+ @Override
+ public void build() throws Exception {
+ if (!isParallelMode()) {
+ // Not in parallel mode, nothing to build
+ return;
+ }
+
+ // Prepare table and config using shared method
+ Pair<FileStoreTable, ExpireConfig> prepared =
+ resolveExpireTableAndConfig(
Review Comment:
done
##########
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpireSnapshotsAction.java:
##########
@@ -50,12 +95,168 @@ public ExpireSnapshotsAction(
this.olderThan = olderThan;
this.maxDeletes = maxDeletes;
this.options = options;
+ this.parallelism = parallelism;
}
+ /** Returns true if forceStartFlinkJob is enabled and parallelism is
greater than 1. */
+ private boolean isParallelMode() {
+ return forceStartFlinkJob && parallelism != null && parallelism > 1;
+ }
+
+ @Override
+ public void run() throws Exception {
+ if (parallelism != null && parallelism > 1 && !forceStartFlinkJob) {
+ throw new IllegalArgumentException(
+ "Parallel expire mode requires both --parallelism > 1 and
--force_start_flink_job enabled.");
+ }
+ if (isParallelMode()) {
+ // Parallel mode: build and execute Flink job (multi parallelism)
+ build();
+ execute(this.getClass().getSimpleName());
+ } else if (forceStartFlinkJob) {
+ // Serial mode but forced to run as Flink job (single parallelism)
+ super.run();
+ } else {
+ // Serial mode: execute locally
+ executeLocally();
+ }
+ }
+
+ @Override
public void executeLocally() throws Exception {
ExpireSnapshotsProcedure expireSnapshotsProcedure = new
ExpireSnapshotsProcedure();
expireSnapshotsProcedure.withCatalog(catalog);
expireSnapshotsProcedure.call(
null, database + "." + table, retainMax, retainMin, olderThan,
maxDeletes, options);
}
+
+ @Override
+ public void build() throws Exception {
+ if (!isParallelMode()) {
+ // Not in parallel mode, nothing to build
+ return;
+ }
+
+ // Prepare table and config using shared method
+ Pair<FileStoreTable, ExpireConfig> prepared =
+ resolveExpireTableAndConfig(
+ catalog.getTable(Identifier.fromString(database + "."
+ table)),
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]