This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3c13275ed9 [Feature] [api env] Add job-level configuration for
checkpoint timeout. (#5222)
3c13275ed9 is described below
commit 3c13275ed92d6ee7d2715203b223bba262554a48
Author: ic4y <[email protected]>
AuthorDate: Mon Aug 28 16:15:42 2023 +0800
[Feature] [api env] Add job-level configuration for checkpoint timeout.
(#5222)
---
docs/en/connector-v2/sink/Console.md | 16 ++++-
.../apache/seatunnel/api/env/EnvCommonOptions.java | 6 ++
.../apache/seatunnel/api/env/EnvOptionRule.java | 1 +
.../seatunnel/console/sink/ConsoleSink.java | 17 ++++-
.../seatunnel/console/sink/ConsoleSinkFactory.java | 23 +++++-
.../seatunnel/console/sink/ConsoleSinkWriter.java | 36 +++++++---
.../console/sink/ConsoleSinkWriterIT.java | 2 +-
.../flink/execution/FlinkRuntimeEnvironment.java | 5 +-
.../flink/execution/FlinkRuntimeEnvironment.java | 5 +-
.../server/checkpoint/CheckpointCloseReason.java | 2 +-
.../seatunnel/engine/server/master/JobMaster.java | 5 ++
.../server/checkpoint/CheckpointTimeOutTest.java | 83 ++++++++++++++++++++++
.../stream_fake_to_console_checkpointTimeOut.conf | 54 ++++++++++++++
13 files changed, 236 insertions(+), 19 deletions(-)
diff --git a/docs/en/connector-v2/sink/Console.md
b/docs/en/connector-v2/sink/Console.md
index fd7623d7d3..55df281b27 100644
--- a/docs/en/connector-v2/sink/Console.md
+++ b/docs/en/connector-v2/sink/Console.md
@@ -14,14 +14,24 @@ Used to send data to Console. Both support streaming and
batch mode.
## Options
-| name | type | required | default value |
-|----------------|------|----------|---------------|
-| common-options | | no | - |
+| name | type | required | default value |
+|--------------------|---------|----------|---------------|
+| common-options | | no | - |
+| log.print.data | boolean | no | yes |
+| log.print.delay.ms | int | no | 0 |
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
+### log.print.data
+
+Flag to determine whether data should be printed in the logs. The default
value is `true`.
+
+### log.print.delay.ms
+
+Delay in milliseconds between printing each data item to the logs. The default
value is `0`.
+
## Example
simple:
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index bc80c66428..d076cd5367 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -51,6 +51,12 @@ public interface EnvCommonOptions {
.withDescription(
"The interval (in milliseconds) between two
consecutive checkpoints.");
+ Option<Long> CHECKPOINT_TIMEOUT =
+ Options.key("checkpoint.timeout")
+ .longType()
+ .noDefaultValue()
+ .withDescription("The timeout (in milliseconds) for a
checkpoint.");
+
Option<String> JARS =
Options.key("jars")
.stringType()
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index 3a90b82e83..09310f080c 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -30,6 +30,7 @@ public class EnvOptionRule {
CommonOptions.PARALLELISM,
EnvCommonOptions.JARS,
EnvCommonOptions.CHECKPOINT_INTERVAL,
+ EnvCommonOptions.CHECKPOINT_TIMEOUT,
EnvCommonOptions.CUSTOM_PARAMETERS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 036a5d802f..49957b99e2 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.console.sink;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -30,13 +31,20 @@ import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
+import static
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DATA;
+import static
org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSinkFactory.LOG_PRINT_DELAY;
+
@NoArgsConstructor
@AutoService(SeaTunnelSink.class)
public class ConsoleSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
private SeaTunnelRowType seaTunnelRowType;
+ private boolean isPrintData = true;
+ private int delayMs = 0;
- public ConsoleSink(SeaTunnelRowType seaTunnelRowType) {
+ public ConsoleSink(SeaTunnelRowType seaTunnelRowType, ReadonlyConfig
options) {
this.seaTunnelRowType = seaTunnelRowType;
+ this.isPrintData = options.get(LOG_PRINT_DATA);
+ this.delayMs = options.get(LOG_PRINT_DELAY);
}
@Override
@@ -51,7 +59,7 @@ public class ConsoleSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
@Override
public AbstractSinkWriter<SeaTunnelRow, Void>
createWriter(SinkWriter.Context context) {
- return new ConsoleSinkWriter(seaTunnelRowType, context);
+ return new ConsoleSinkWriter(seaTunnelRowType, context, isPrintData,
delayMs);
}
@Override
@@ -60,5 +68,8 @@ public class ConsoleSink extends
AbstractSimpleSink<SeaTunnelRow, Void> {
}
@Override
- public void prepare(Config pluginConfig) {}
+ public void prepare(Config pluginConfig) {
+ this.isPrintData =
ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DATA);
+ this.delayMs =
ReadonlyConfig.fromConfig(pluginConfig).get(LOG_PRINT_DELAY);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index 1e0450d66c..5a66493aee 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.console.sink;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
@@ -27,6 +30,21 @@ import com.google.auto.service.AutoService;
@AutoService(Factory.class)
public class ConsoleSinkFactory implements TableSinkFactory {
+
+ public static final Option<Boolean> LOG_PRINT_DATA =
+ Options.key("log.print.data")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Flag to determine whether data should be printed
in the logs.");
+
+ public static final Option<Integer> LOG_PRINT_DELAY =
+ Options.key("log.print.delay.ms")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ "Delay in milliseconds between printing each data
item to the logs.");
+
@Override
public String factoryIdentifier() {
return "Console";
@@ -39,7 +57,10 @@ public class ConsoleSinkFactory implements TableSinkFactory {
@Override
public TableSink createSink(TableFactoryContext context) {
+ ReadonlyConfig options = context.getOptions();
return () ->
- new
ConsoleSink(context.getCatalogTable().getTableSchema().toPhysicalRowDataType());
+ new ConsoleSink(
+
context.getCatalogTable().getTableSchema().toPhysicalRowDataType(),
+ options);
}
}
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
index fc3f7f232c..c8c6c945ff 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriter.java
@@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.apache.commons.lang3.StringUtils;
@@ -44,9 +45,18 @@ public class ConsoleSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
private final SinkWriter.Context context;
private final DataTypeChangeEventHandler dataTypeChangeEventHandler;
- public ConsoleSinkWriter(SeaTunnelRowType seaTunnelRowType,
SinkWriter.Context context) {
+ boolean isPrintData = true;
+ int delayMs = 0;
+
+ public ConsoleSinkWriter(
+ SeaTunnelRowType seaTunnelRowType,
+ SinkWriter.Context context,
+ boolean isPrintData,
+ int delayMs) {
this.seaTunnelRowType = seaTunnelRowType;
this.context = context;
+ this.isPrintData = isPrintData;
+ this.delayMs = delayMs;
this.dataTypeChangeEventHandler = new DataTypeChangeEventDispatcher();
log.info("output rowType: {}", fieldsInfo(seaTunnelRowType));
}
@@ -66,13 +76,23 @@ public class ConsoleSinkWriter extends
AbstractSinkWriter<SeaTunnelRow, Void> {
for (int i = 0; i < fieldTypes.length; i++) {
arr[i] = fieldToString(fieldTypes[i], fields[i]);
}
- log.info(
- "subtaskIndex={} rowIndex={}: SeaTunnelRow#tableId={}
SeaTunnelRow#kind={} : {}",
- context.getIndexOfSubtask(),
- rowCounter.incrementAndGet(),
- element.getTableId(),
- element.getRowKind(),
- StringUtils.join(arr, ", "));
+ if (isPrintData) {
+ log.info(
+ "subtaskIndex={} rowIndex={}: SeaTunnelRow#tableId={}
SeaTunnelRow#kind={} : {}",
+ context.getIndexOfSubtask(),
+ rowCounter.incrementAndGet(),
+ element.getTableId(),
+ element.getRowKind(),
+ StringUtils.join(arr, ", "));
+ }
+ if (delayMs > 0) {
+ try {
+ Thread.sleep(delayMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new SeaTunnelException(e);
+ }
+ }
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
index 0220c88962..e03c00c495 100644
---
a/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
+++
b/seatunnel-connectors-v2/connector-console/src/test/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkWriterIT.java
@@ -48,7 +48,7 @@ public class ConsoleSinkWriterIT {
String[] fieldNames = {};
SeaTunnelDataType<?>[] fieldTypes = {};
SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames,
fieldTypes);
- consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null);
+ consoleSinkWriter = new ConsoleSinkWriter(seaTunnelRowType, null,
true, 0);
}
private Object fieldToStringTest(SeaTunnelDataType<?> dataType, Object
value) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 7fb75064a4..34aa7ee4f2 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -265,7 +265,10 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
}
}
- if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
+ if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+ long timeout =
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
+ checkpointConfig.setCheckpointTimeout(timeout);
+ } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
long timeout =
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
checkpointConfig.setCheckpointTimeout(timeout);
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 4b5bef07cb..583a1cf3e5 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -265,7 +265,10 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
}
}
- if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
+ if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+ long timeout =
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
+ checkpointConfig.setCheckpointTimeout(timeout);
+ } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
long timeout =
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
checkpointConfig.setCheckpointTimeout(timeout);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index 9f35f62fd6..c07f10fb1c 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
public enum CheckpointCloseReason {
PIPELINE_END("Pipeline turn to end state."),
CHECKPOINT_EXPIRED(
- "Checkpoint expired before completing. Please increase checkpoint
timeout in the seatunnel.yaml"),
+ "Checkpoint expired before completing. Please increase checkpoint
timeout in the seatunnel.yaml or jobConfig env."),
CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 5b329dbff8..6246831843 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -278,6 +278,11 @@ public class JobMaster {
Long.parseLong(
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString()));
}
+ if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+ jobCheckpointConfig.setCheckpointTimeout(
+ Long.parseLong(
+
jobEnv.get(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()).toString()));
+ }
return jobCheckpointConfig;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
new file mode 100644
index 0000000000..ed2b4dbb6d
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointTimeOutTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.internal.serialization.Data;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+public class CheckpointTimeOutTest extends AbstractSeaTunnelServerTest {
+
+ public static String CONF_PATH =
"stream_fake_to_console_checkpointTimeOut.conf";
+ public static long JOB_ID = System.currentTimeMillis();
+
+ @Test
+ public void testJobLevelCheckpointTimeOut() {
+ startJob(JOB_ID, CONF_PATH);
+
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertTrue(
+ server.getCoordinatorService()
+ .getJobStatus(JOB_ID)
+ .equals(JobStatus.RUNNING));
+ });
+
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Assertions.assertTrue(
+ server.getCoordinatorService()
+ .getJobStatus(JOB_ID)
+ .equals(JobStatus.FAILED));
+ });
+ }
+
+ private void startJob(Long jobid, String path) {
+ LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path,
jobid.toString(), jobid);
+
+ JobImmutableInformation jobImmutableInformation =
+ new JobImmutableInformation(
+ jobid,
+ "Test",
+ false,
+
nodeEngine.getSerializationService().toData(testLogicalDag),
+ testLogicalDag.getJobConfig(),
+ Collections.emptyList());
+
+ Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ server.getCoordinatorService().submitJob(jobid, data);
+ voidPassiveCompletableFuture.join();
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpointTimeOut.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpointTimeOut.conf
new file mode 100644
index 0000000000..2d541ac2ac
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_checkpointTimeOut.conf
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 1000
+ checkpoint.timeout = 100
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake1"
+ row.num = 1000
+ split.num = 100
+ split.read-interval = 3000
+ parallelism = 1
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ log.print.delay.ms=5000
+ }
+}
\ No newline at end of file