This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3c13275ed9 [Feature] [api env] Add job-level configuration for 
checkpoint timeout. (#5222)
3c13275ed9 is described below

commit 3c13275ed92d6ee7d2715203b223bba262554a48
Author: ic4y <[email protected]>
AuthorDate: Mon Aug 28 16:15:42 2023 +0800

    [Feature] [api env] Add job-level configuration for checkpoint timeout. 
(#5222)
---
 docs/en/connector-v2/sink/Console.md               | 16 ++++-
 .../apache/seatunnel/api/env/EnvCommonOptions.java |  6 ++
 .../apache/seatunnel/api/env/EnvOptionRule.java    |  1 +
 .../seatunnel/console/sink/ConsoleSink.java        | 17 ++++-
 .../seatunnel/console/sink/ConsoleSinkFactory.java | 23 +++++-
 .../seatunnel/console/sink/ConsoleSinkWriter.java  | 36 +++++++---
 .../console/sink/ConsoleSinkWriterIT.java          |  2 +-
 .../flink/execution/FlinkRuntimeEnvironment.java   |  5 +-
 .../flink/execution/FlinkRuntimeEnvironment.java   |  5 +-
 .../server/checkpoint/CheckpointCloseReason.java   |  2 +-
 .../seatunnel/engine/server/master/JobMaster.java  |  5 ++
 .../server/checkpoint/CheckpointTimeOutTest.java   | 83 ++++++++++++++++++++++
 .../stream_fake_to_console_checkpointTimeOut.conf  | 54 ++++++++++++++
 13 files changed, 236 insertions(+), 19 deletions(-)

diff --git a/docs/en/connector-v2/sink/Console.md 
b/docs/en/connector-v2/sink/Console.md
index fd7623d7d3..55df281b27 100644
--- a/docs/en/connector-v2/sink/Console.md
+++ b/docs/en/connector-v2/sink/Console.md
@@ -14,14 +14,24 @@ Used to send data to Console. Both support streaming and 
batch mode.
 
 ## Options
 
-|      name      | type | required | default value |
-|----------------|------|----------|---------------|
-| common-options |      | no       | -             |
+|        name        |  type   | required | default value |
+|--------------------|---------|----------|---------------|
+| common-options     |         | no       | -             |
+| log.print.data     | boolean | no       | yes           |
+| log.print.delay.ms | int     | no       | 0             |
 
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
 
+### log.print.data
+
+Flag to determine whether data should be printed in the logs. The default 
value is `true`.
+
+### log.print.delay.ms
+
+Delay in milliseconds between printing each data item to the logs. The default 
value is `0`.
+
 ## Example
 
 simple:
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index bc80c66428..d076cd5367 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -51,6 +51,12 @@ public interface EnvCommonOptions {
                     .withDescription(
                             "The interval (in milliseconds) between two 
consecutive checkpoints.");
 
+    Option<Long> CHECKPOINT_TIMEOUT =
+            Options.key("checkpoint.timeout")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("The timeout (in milliseconds) for a 
checkpoint.");
+
     Option<String> JARS =
             Options.key("jars")
                     .stringType()
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index 3a90b82e83..09310f080c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -30,6 +30,7 @@ public class EnvOptionRule {
                         CommonOptions.PARALLELISM,
                         EnvCommonOptions.JARS,
                         EnvCommonOptions.CHECKPOINT_INTERVAL,
+                        EnvCommonOptions.CHECKPOINT_TIMEOUT,
                         EnvCommonOptions.CUSTOM_PARAMETERS)
                 .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 036a5d802f..49957b99e2 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.console.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -30,13 +31,20 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 import com.google.auto.service.AutoService;
 import lombok.NoArgsConstructor;
 
+import static 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
+import static 
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;
+
 @NoArgsConstructor
 @AutoService(SeaTunnelSink.class)
 public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
     private SeaTunnelRowType seaTunnelRowType;
+    private boolean isPrintData = true;
+    private int delayMs = 0;
 
-    public ConsoleSink(SeaTunnelRowType seaTunnelRowType) {
+    public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig 
options) {
         this.seaTunnelRowType = seaTunnelRowType;
+        this.isPrintData = options.get(LOG_PRINT_DATA);
+        this.delayMs = options.get(LOG_PRINT_DELAY);
     }
 
     @Override
@@ -51,7 +59,7 @@ public class ConsoleSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
 
     @Override
     public AbstractSinkWriter<SeaTunnelRow, Void> 
createWriter(SinkWriter.Context context) {
-        return new ConsoleSinkWriter(seaTunnelRowType, context);
+        return new ConsoleSinkWriter(seaTunnelRowType, context, isPrintData, 
delayMs);
     }
 
     @Override
@@ -60,5 +68,8 @@ public class ConsoleSink extends 
AbstractSimpleSink<SeaTunnelRow, Void> {
     }
 
     @Override
-    public void prepare(Config pluginConfig) {}
+    public void prepare(Config pluginConfig) {
+        this.isPrintData = 
ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DATA);
+        this.delayMs = 
ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DELAY);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index 1e0450d66c..5a66493aee 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
@@ -27,6 +30,21 @@ import com.google.auto.service.AutoService;
 
 @AutoService(Factory.class)
 public class ConsoleSinkFactory implements TableSinkFactory {
+
+    public static final Option<Boolean> LOG_PRINT_DATA =
+            Options.key("log.print.data")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Flag to determine whether data should be printed 
in the logs.");
+
+    public static final Option<Integer> LOG_PRINT_DELAY =
+            Options.key("log.print.delay.ms")
+                    .intType()
+                    .defaultValue(0)
+                    .withDescription(
+                            "Delay in milliseconds between printing each data 
item to the logs.");
+
     @Override
     public String factoryIdentifier() {
         return "Console";
@@ -39,7 +57,10 @@ public class ConsoleSinkFactory implements TableSinkFactory {
 
     @Override
     public TableSink createSink(TableFactoryContext context) {
+        ReadonlyConfig options = context.getOptions();
         return () ->
-                new 
ConsoleSink(context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+                new ConsoleSink(
+                        
context.getCatalogTable().getTableSchema().toPhysicalRowDataType(),
+                        options);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index fc3f7f232c..c8c6c945ff 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
 
 import org.apache.commons.lang3.StringUtils;
@@ -44,9 +45,18 @@ public class ConsoleSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
     private final SinkWriter.Context context;
     private final DataTypeChangeEventHandler dataTypeChangeEventHandler;
 
-    public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType, 
SinkWriter.Context context) {
+    boolean isPrintData = true;
+    int delayMs = 0;
+
+    public ConsoleSinkWriter(
+            SeaTunnelRowType seaTunnelRowType,
+            SinkWriter.Context context,
+            boolean isPrintData,
+            int delayMs) {
         this.seaTunnelRowType = seaTunnelRowType;
         this.context = context;
+        this.isPrintData = isPrintData;
+        this.delayMs = delayMs;
         this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
         log.info("output rowType: {}", fieldsInfo(seaTunnelRowType));
     }
@@ -66,13 +76,23 @@ public class ConsoleSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         for (int i = 0; i < fieldTypes.length; i++) {
             arr[i] = fieldToString(fieldTypes[i], fields[i]);
         }
-        log.info(
-                "subtaskIndex={}  rowIndex={}:  SeaTunnelRow#tableId={} 
SeaTunnelRow#kind={} : {}",
-                context.getIndexOfSubtask(),
-                rowCounter.incrementAndGet(),
-                element.getTableId(),
-                element.getRowKind(),
-                StringUtils.join(arr, ", "));
+        if (isPrintData) {
+            log.info(
+                    "subtaskIndex={}  rowIndex={}:  SeaTunnelRow#tableId={} 
SeaTunnelRow#kind={} : {}",
+                    context.getIndexOfSubtask(),
+                    rowCounter.incrementAndGet(),
+                    element.getTableId(),
+                    element.getRowKind(),
+                    StringUtils.join(arr, ", "));
+        }
+        if (delayMs > 0) {
+            try {
+                Thread.sleep(delayMs);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new SeaTunnelException(e);
+            }
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
 
b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
index 0220c88962..e03c00c495 100644
--- 
a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
+++ 
b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
@@ -48,7 +48,7 @@ public class ConsoleSinkWriterIT {
         String[] fieldNames = {};
         SeaTunnelDataType<?>[] fieldTypes = {};
         SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, 
fieldTypes);
-        consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null);
+        consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null, 
true, 0);
     }
 
     private Object fieldToStringTest(SeaTunnelDataType<?> dataType, Object 
value) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 7fb75064a4..34aa7ee4f2 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -265,7 +265,10 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
                 }
             }
 
-            if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
+            if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+                long timeout = 
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
+                checkpointConfig.setCheckpointTimeout(timeout);
+            } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
                 long timeout = 
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
                 checkpointConfig.setCheckpointTimeout(timeout);
             }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 4b5bef07cb..583a1cf3e5 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -265,7 +265,10 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
                 }
             }
 
-            if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
+            if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+                long timeout = 
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
+                checkpointConfig.setCheckpointTimeout(timeout);
+            } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
                 long timeout = 
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
                 checkpointConfig.setCheckpointTimeout(timeout);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index 9f35f62fd6..c07f10fb1c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
 public enum CheckpointCloseReason {
     PIPELINE_END("Pipeline turn to end state."),
     CHECKPOINT_EXPIRED(
-            "Checkpoint expired before completing. Please increase checkpoint 
timeout in the seatunnel.yaml"),
+            "Checkpoint expired before completing. Please increase checkpoint 
timeout in the seatunnel.yaml or jobConfig env."),
     CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
     CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
     CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 5b329dbff8..6246831843 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -278,6 +278,11 @@ public class JobMaster {
                     Long.parseLong(
                             
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString()));
         }
+        if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+            jobCheckpointConfig.setCheckpointTimeout(
+                    Long.parseLong(
+                            
jobEnv.get(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()).toString()));
+        }
         return jobCheckpointConfig;
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
new file mode 100644
index 0000000000..ed2b4dbb6d
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.internal.serialization.Data;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+public class CheckpointTimeOutTest extends AbstractSeaTunnelServerTest {
+
+    public static String CONF_PATH = 
"stream_fake_to_console_checkpointTimeOut.conf";
+    public static long JOB_ID = System.currentTimeMillis();
+
+    @Test
+    public void testJobLevelCheckpointTimeOut() {
+        startJob(JOB_ID, CONF_PATH);
+
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertTrue(
+                                    server.getCoordinatorService()
+                                            .getJobStatus(JOB_ID)
+                                            .equals(JobStatus.RUNNING));
+                        });
+
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            Assertions.assertTrue(
+                                    server.getCoordinatorService()
+                                            .getJobStatus(JOB_ID)
+                                            .equals(JobStatus.FAILED));
+                        });
+    }
+
+    private void startJob(Long jobid, String path) {
+        LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, 
jobid.toString(), jobid);
+
+        JobImmutableInformation jobImmutableInformation =
+                new JobImmutableInformation(
+                        jobid,
+                        "Test",
+                        false,
+                        
nodeEngine.getSerializationService().toData(testLogicalDag),
+                        testLogicalDag.getJobConfig(),
+                        Collections.emptyList());
+
+        Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+                server.getCoordinatorService().submitJob(jobid, data);
+        voidPassiveCompletableFuture.join();
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpointTimeOut.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpointTimeOut.conf
new file mode 100644
index 0000000000..2d541ac2ac
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpointTimeOut.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 1000
+  checkpoint.timeout = 100
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    FakeSource {
+      result_table_name = "fake1"
+       row.num = 1000
+       split.num = 100
+       split.read-interval = 3000
+       parallelism = 1
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+}
+
+transform {
+}
+
+sink {
+  console {
+  log.print.delay.ms=5000
+  }
+}
\ No newline at end of file

Reply via email to