Carl-Zhou-CN commented on code in PR #10094:
URL: https://github.com/apache/seatunnel/pull/10094#discussion_r2548891644
##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java:
##########
@@ -77,21 +77,35 @@ public StreamExecutionEnvironment
getStreamExecutionEnvironment() {
}
protected void setCheckpoint() {
- if (jobMode == JobMode.BATCH) {
- log.warn(
- "Disabled Checkpointing. In flink execution environment,
checkpointing is not supported and not needed when executing jobs in BATCH
mode");
- }
- long interval;
+ long interval = 10000L;
+ boolean hasExplicitInterval = false;
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
interval =
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
+ hasExplicitInterval = true;
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
log.warn(
"the parameter 'execution.checkpoint.interval' will be
deprecated, please use common parameter 'checkpoint.interval' to set it");
interval = config.getLong(ConfigKeyName.CHECKPOINT_INTERVAL);
- } else {
+ }
+
+ if (hasExplicitInterval && interval <= 0) {
+ log.warn(
+ "checkpoint.interval is set to {} which is not positive,
checkpoint will be disabled for batch job and default interval will be used for
streaming job.",
+ interval);
interval = 10000L;
}
+ boolean enableCheckpoint =
+ JobMode.STREAMING.equals(jobMode) || (hasExplicitInterval &&
interval > 0);
+
+ if (!enableCheckpoint) {
+ if (jobMode == JobMode.BATCH) {
+ log.info(
+ "Checkpoint is disabled for batch job because
'checkpoint.interval' is not set or <= 0.");
+ }
+ return;
Review Comment:
It seems that it should be within `if`
##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java:
##########
@@ -135,11 +135,21 @@ public void execute() throws TaskExecuteException {
"Flink Execution Plan: {}",
flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
LOGGER.info("Flink job name: {}",
flinkRuntimeEnvironment.getJobName());
- if (!flinkRuntimeEnvironment.isStreaming()) {
- flinkRuntimeEnvironment
- .getStreamExecutionEnvironment()
- .setRuntimeMode(RuntimeExecutionMode.BATCH);
- LOGGER.info("Flink job Mode: {}", JobMode.BATCH);
+ if (flinkRuntimeEnvironment.getJobMode() == JobMode.BATCH) {
+ boolean enableCheckpointForBatch =
+ flinkRuntimeEnvironment
+ .getConfig()
+
.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())
+ && flinkRuntimeEnvironment
+ .getConfig()
+
.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key())
+ > 0;
Review Comment:
I think such a judgment is too complicated. Can we put it all together
##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java:
##########
@@ -77,21 +77,35 @@ public StreamExecutionEnvironment
getStreamExecutionEnvironment() {
}
protected void setCheckpoint() {
- if (jobMode == JobMode.BATCH) {
- log.warn(
- "Disabled Checkpointing. In flink execution environment,
checkpointing is not supported and not needed when executing jobs in BATCH
mode");
- }
- long interval;
+ long interval = 10000L;
+ boolean hasExplicitInterval = false;
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
interval =
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
+ hasExplicitInterval = true;
} else if (config.hasPath(ConfigKeyName.CHECKPOINT_INTERVAL)) {
Review Comment:
The behaviors of the two should be exactly the same
##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractFlinkRuntimeEnvironment.java:
##########
@@ -77,21 +77,35 @@ public StreamExecutionEnvironment
getStreamExecutionEnvironment() {
}
protected void setCheckpoint() {
- if (jobMode == JobMode.BATCH) {
- log.warn(
- "Disabled Checkpointing. In flink execution environment,
checkpointing is not supported and not needed when executing jobs in BATCH
mode");
- }
- long interval;
+ long interval = 10000L;
Review Comment:
It appears many times that we seem to need a constant
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]