This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 3add2126b6 [Chore][Core] Clean flow control code (#5991)
3add2126b6 is described below
commit 3add2126b69a3997a2d626c55d60c873ef01d3cc
Author: Tyrantlucifer <[email protected]>
AuthorDate: Tue Dec 12 15:17:11 2023 +0800
[Chore][Core] Clean flow control code (#5991)
---
.../core/starter/flowcontrol/FlowControlGate.java | 24 +++--
.../starter/flowcontrol/FlowControlStrategy.java | 117 +++++++++++++--------
.../flink/execution/SinkExecuteProcessor.java | 7 +-
.../FlinkAbstractPluginExecuteProcessor.java | 17 +--
.../starter/flink/execution/FlinkExecution.java | 9 +-
.../flink/execution/SinkExecuteProcessor.java | 7 +-
.../flink/execution/SourceExecuteProcessor.java | 11 +-
.../flink/execution/TransformExecuteProcessor.java | 7 +-
.../server/task/SeaTunnelSourceCollector.java | 8 +-
.../engine/server/task/SourceSeaTunnelTask.java | 5 +-
.../flink/source/FlinkRowCollector.java | 10 +-
.../translation/flink/source/FlinkSource.java | 9 +-
.../flink/source/FlinkSourceReader.java | 5 +-
.../spark/serialization/InternalRowCollector.java | 11 +-
14 files changed, 154 insertions(+), 93 deletions(-)
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
index 10b2301329..5d06366e3a 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
@@ -21,19 +21,31 @@ import
org.apache.seatunnel.shade.com.google.common.util.concurrent.RateLimiter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import java.util.Optional;
+
public class FlowControlGate {
- private final RateLimiter bytesRateLimiter;
- private final RateLimiter countRateLimiter;
+ private static final int DEFAULT_VALUE = Integer.MAX_VALUE;
+
+ private final Optional<RateLimiter> bytesRateLimiter;
+ private final Optional<RateLimiter> countRateLimiter;
private FlowControlGate(FlowControlStrategy flowControlStrategy) {
- this.bytesRateLimiter =
RateLimiter.create(flowControlStrategy.getBytesPerSecond());
- this.countRateLimiter =
RateLimiter.create(flowControlStrategy.getCountPreSecond());
+ final int bytesPerSecond = flowControlStrategy.getBytesPerSecond();
+ final int countPreSecond = flowControlStrategy.getCountPreSecond();
+ this.bytesRateLimiter =
+ bytesPerSecond == DEFAULT_VALUE
+ ? Optional.empty()
+ : Optional.of(RateLimiter.create(bytesPerSecond));
+ this.countRateLimiter =
+ countPreSecond == DEFAULT_VALUE
+ ? Optional.empty()
+ : Optional.of(RateLimiter.create(countPreSecond));
}
public void audit(SeaTunnelRow row) {
- bytesRateLimiter.acquire(row.getBytesSize());
- countRateLimiter.acquire();
+ bytesRateLimiter.ifPresent(rateLimiter ->
rateLimiter.acquire(row.getBytesSize()));
+ countRateLimiter.ifPresent(RateLimiter::acquire);
}
public static FlowControlGate create(FlowControlStrategy
flowControlStrategy) {
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
index 9ab0d041fd..923dccee0d 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
@@ -17,21 +17,21 @@
package org.apache.seatunnel.core.starter.flowcontrol;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-
-import lombok.Getter;
-import lombok.Setter;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.util.Map;
+import java.util.Optional;
+
+import static
org.apache.seatunnel.api.env.EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND;
+import static
org.apache.seatunnel.api.env.EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND;
+
+public final class FlowControlStrategy {
-@Getter
-@Setter
-public class FlowControlStrategy {
+ private final int bytesPerSecond;
- int bytesPerSecond;
- int countPreSecond;
+ private final int countPreSecond;
- public FlowControlStrategy(int bytesPerSecond, int countPreSecond) {
+ FlowControlStrategy(int bytesPerSecond, int countPreSecond) {
if (bytesPerSecond <= 0 || countPreSecond <= 0) {
throw new IllegalArgumentException(
"bytesPerSecond and countPreSecond must be positive");
@@ -40,53 +40,78 @@ public class FlowControlStrategy {
this.countPreSecond = countPreSecond;
}
+ public int getBytesPerSecond() {
+ return bytesPerSecond;
+ }
+
+ public int getCountPreSecond() {
+ return countPreSecond;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+
+ private int bytesPerSecond = Integer.MAX_VALUE;
+
+ private int countPreSecond = Integer.MAX_VALUE;
+
+ private Builder() {}
+
+ public Builder bytesPerSecond(int bytesPerSecond) {
+ this.bytesPerSecond = bytesPerSecond;
+ return this;
+ }
+
+ public Builder countPerSecond(int countPreSecond) {
+ this.countPreSecond = countPreSecond;
+ return this;
+ }
+
+ public FlowControlStrategy build() {
+ return new FlowControlStrategy(bytesPerSecond, countPreSecond);
+ }
+ }
+
public static FlowControlStrategy of(int bytesPerSecond, int
countPreSecond) {
- return new FlowControlStrategy(bytesPerSecond, countPreSecond);
+ return FlowControlStrategy.builder()
+ .bytesPerSecond(bytesPerSecond)
+ .countPerSecond(countPreSecond)
+ .build();
}
public static FlowControlStrategy ofBytes(int bytesPerSecond) {
- return new FlowControlStrategy(bytesPerSecond, Integer.MAX_VALUE);
+ return
FlowControlStrategy.builder().bytesPerSecond(bytesPerSecond).build();
}
public static FlowControlStrategy ofCount(int countPreSecond) {
- return new FlowControlStrategy(Integer.MAX_VALUE, countPreSecond);
+ return
FlowControlStrategy.builder().countPerSecond(countPreSecond).build();
}
- // Build the FlowControlStrategy object based on your configured speed
limiting parameters
- public static FlowControlStrategy getFlowControlStrategy(Map<String,
Object> envOption) {
- FlowControlStrategy strategy;
+ public static FlowControlStrategy fromMap(Map<String, Object> envOption) {
+ Builder builder = FlowControlStrategy.builder();
if (envOption == null || envOption.isEmpty()) {
- return null;
+ return builder.build();
+ }
+ final Object bytePerSecond =
envOption.get(READ_LIMIT_BYTES_PER_SECOND.key());
+ final Object countPerSecond =
envOption.get(READ_LIMIT_ROW_PER_SECOND.key());
+ Optional.ofNullable(bytePerSecond)
+ .ifPresent(bps ->
builder.bytesPerSecond(Integer.parseInt(bps.toString())));
+ Optional.ofNullable(countPerSecond)
+ .ifPresent(cps ->
builder.countPerSecond(Integer.parseInt(cps.toString())));
+ return builder.build();
+ }
+
+ public static FlowControlStrategy fromConfig(Config envConfig) {
+ Builder builder = FlowControlStrategy.builder();
+ if (envConfig.hasPath(READ_LIMIT_BYTES_PER_SECOND.key())) {
+
builder.bytesPerSecond(envConfig.getInt(READ_LIMIT_BYTES_PER_SECOND.key()));
}
- if
(envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
- &&
envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
- strategy =
- FlowControlStrategy.of(
- Integer.parseInt(
- envOption
-
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
- .toString()),
- Integer.parseInt(
- envOption
-
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
- .toString()));
- } else if
(envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())) {
- strategy =
- FlowControlStrategy.ofBytes(
- Integer.parseInt(
- envOption
-
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
- .toString()));
- } else if
(envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
- strategy =
- FlowControlStrategy.ofCount(
- Integer.parseInt(
- envOption
-
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
- .toString()));
- } else {
- strategy = null;
+ if (envConfig.hasPath(READ_LIMIT_ROW_PER_SECOND.key())) {
+
builder.countPerSecond(envConfig.getInt(READ_LIMIT_ROW_PER_SECOND.key()));
}
- return strategy;
+ return builder.build();
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 862c0a5e7c..05fde53336 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -56,8 +56,11 @@ public class SinkExecuteProcessor
private static final String PLUGIN_TYPE = PluginType.SINK.getType();
protected SinkExecuteProcessor(
- List<URL> jarPaths, List<? extends Config> pluginConfigs,
JobContext jobContext) {
- super(jarPaths, pluginConfigs, jobContext);
+ List<URL> jarPaths,
+ Config envConfig,
+ List<? extends Config> pluginConfigs,
+ JobContext jobContext) {
+ super(jarPaths, envConfig, pluginConfigs, jobContext);
}
@Override
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 683ff77449..6b72ed3a42 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -44,7 +44,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
protected static final String ENGINE_TYPE = "seatunnel";
protected static final String PLUGIN_NAME_KEY = "plugin_name";
protected static final String SOURCE_TABLE_NAME = "source_table_name";
- protected static HashMap<String, Boolean> isAppendMap = new HashMap<>();
+ protected static HashMap<String, Boolean> IS_APPEND_STREAM_MAP = new
HashMap<>();
protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER
=
(classLoader, url) -> {
@@ -64,12 +64,17 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
protected final List<? extends Config> pluginConfigs;
protected JobContext jobContext;
protected final List<T> plugins;
+ protected final Config envConfig;
protected FlinkAbstractPluginExecuteProcessor(
- List<URL> jarPaths, List<? extends Config> pluginConfigs,
JobContext jobContext) {
+ List<URL> jarPaths,
+ Config envConfig,
+ List<? extends Config> pluginConfigs,
+ JobContext jobContext) {
this.pluginConfigs = pluginConfigs;
this.jobContext = jobContext;
this.plugins = initializePlugins(jarPaths, pluginConfigs);
+ this.envConfig = envConfig;
}
@Override
@@ -98,7 +103,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
TableUtil.tableToDataStream(
tableEnvironment,
table,
- isAppendMap.getOrDefault(tableName, true)),
+
IS_APPEND_STREAM_MAP.getOrDefault(tableName, true)),
dataStreamTableInfo.getCatalogTable(),
tableName));
}
@@ -114,7 +119,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
pluginConfig,
dataStream,
resultTable,
- isAppendMap.getOrDefault(sourceTable, true));
+ IS_APPEND_STREAM_MAP.getOrDefault(sourceTable, true));
registerAppendStream(pluginConfig);
return;
}
@@ -122,14 +127,14 @@ public abstract class
FlinkAbstractPluginExecuteProcessor<T>
pluginConfig,
dataStream,
resultTable,
- isAppendMap.getOrDefault(resultTable, true));
+ IS_APPEND_STREAM_MAP.getOrDefault(resultTable, true));
}
}
protected void registerAppendStream(Config pluginConfig) {
if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key());
- isAppendMap.put(tableName, false);
+ IS_APPEND_STREAM_MAP.put(tableName, false);
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 3e079db3a3..39671af080 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -76,21 +76,24 @@ public class FlinkExecution implements TaskExecution {
} catch (MalformedURLException e) {
throw new SeaTunnelException("load flink starter error.", e);
}
- registerPlugin(config.getConfig("env"));
+ Config envConfig = config.getConfig("env");
+ registerPlugin(envConfig);
JobContext jobContext = new JobContext();
jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));
this.sourcePluginExecuteProcessor =
- new SourceExecuteProcessor(jarPaths, config, jobContext);
+ new SourceExecuteProcessor(
+ jarPaths, envConfig,
config.getConfigList(Constants.SOURCE), jobContext);
this.transformPluginExecuteProcessor =
new TransformExecuteProcessor(
jarPaths,
+ envConfig,
TypesafeConfigUtils.getConfigList(
config, Constants.TRANSFORM,
Collections.emptyList()),
jobContext);
this.sinkPluginExecuteProcessor =
new SinkExecuteProcessor(
- jarPaths, config.getConfigList(Constants.SINK),
jobContext);
+ jarPaths, envConfig,
config.getConfigList(Constants.SINK), jobContext);
this.flinkRuntimeEnvironment =
FlinkRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths));
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 6d0055b004..f4af78f81c 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -57,8 +57,11 @@ public class SinkExecuteProcessor
private static final String PLUGIN_TYPE = PluginType.SINK.getType();
protected SinkExecuteProcessor(
- List<URL> jarPaths, List<? extends Config> pluginConfigs,
JobContext jobContext) {
- super(jarPaths, pluginConfigs, jobContext);
+ List<URL> jarPaths,
+ Config envConfig,
+ List<? extends Config> pluginConfigs,
+ JobContext jobContext) {
+ super(jarPaths, envConfig, pluginConfigs, jobContext);
}
@Override
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index df8ca15874..d017b1e204 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
@@ -54,8 +53,12 @@ import static
org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<SourceTableInfo> {
private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
- public SourceExecuteProcessor(List<URL> jarPaths, Config ConfigsInfo,
JobContext jobContext) {
- super(jarPaths, ConfigsInfo.getConfigList(Constants.SOURCE),
jobContext);
+ public SourceExecuteProcessor(
+ List<URL> jarPaths,
+ Config envConfig,
+ List<? extends Config> pluginConfigs,
+ JobContext jobContext) {
+ super(jarPaths, envConfig, pluginConfigs, jobContext);
}
@Override
@@ -70,7 +73,7 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
if (internalSource instanceof SupportCoordinate) {
registerAppendStream(pluginConfig);
}
- FlinkSource flinkSource = new FlinkSource<>(internalSource);
+ FlinkSource flinkSource = new FlinkSource<>(internalSource,
envConfig);
DataStreamSource sourceStream =
executionEnvironment.fromSource(
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 6c145418b4..450599ff7b 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -50,8 +50,11 @@ public class TransformExecuteProcessor
extends FlinkAbstractPluginExecuteProcessor<TableTransformFactory> {
protected TransformExecuteProcessor(
- List<URL> jarPaths, List<? extends Config> pluginConfigs,
JobContext jobContext) {
- super(jarPaths, pluginConfigs, jobContext);
+ List<URL> jarPaths,
+ Config envConfig,
+ List<? extends Config> pluginConfigs,
+ JobContext jobContext) {
+ super(jarPaths, envConfig, pluginConfigs, jobContext);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index aecb64ebfc..f5d4aed1ab 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -93,9 +93,7 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
sourceReceivedBytes = metricsContext.counter(SOURCE_RECEIVED_BYTES);
sourceReceivedBytesPerSeconds =
metricsContext.meter(SOURCE_RECEIVED_BYTES_PER_SECONDS);
- if (flowControlStrategy != null) {
- flowControlGate = FlowControlGate.create(flowControlStrategy);
- }
+ flowControlGate = FlowControlGate.create(flowControlStrategy);
}
@Override
@@ -116,9 +114,7 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
}
sourceReceivedBytes.inc(size);
sourceReceivedBytesPerSeconds.markEvent(size);
- if (flowControlGate != null) {
- flowControlGate.audit((SeaTunnelRow) row);
- }
+ flowControlGate.audit((SeaTunnelRow) row);
}
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 7546d10a7b..53171d4031 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
import
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
@@ -40,8 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import static
org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy.getFlowControlStrategy;
-
public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends
SeaTunnelTask {
private static final ILogger LOGGER =
Logger.getLogger(SourceSeaTunnelTask.class);
@@ -90,7 +89,7 @@ public class SourceSeaTunnelTask<T, SplitT extends
SourceSplit> extends SeaTunne
checkpointLock,
outputs,
this.getMetricsContext(),
- getFlowControlStrategy(envOption),
+ FlowControlStrategy.fromMap(envOption),
sourceProducedType);
((SourceFlowLifeCycle<T, SplitT>)
startFlowLifeCycle).setCollector(collector);
}
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index eb121f6926..b680e4b030 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -17,9 +17,13 @@
package org.apache.seatunnel.translation.flink.source;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
import org.apache.flink.api.connector.source.ReaderOutput;
@@ -35,12 +39,16 @@ public class FlinkRowCollector implements
Collector<SeaTunnelRow> {
private final FlinkRowConverter rowSerialization;
- public FlinkRowCollector(SeaTunnelRowType seaTunnelRowType) {
+ private final FlowControlGate flowControlGate;
+
+ public FlinkRowCollector(SeaTunnelRowType seaTunnelRowType, Config
envConfig) {
this.rowSerialization = new FlinkRowConverter(seaTunnelRowType);
+ this.flowControlGate =
FlowControlGate.create(FlowControlStrategy.fromConfig(envConfig));
}
@Override
public void collect(SeaTunnelRow record) {
+ flowControlGate.audit(record);
try {
readerOutput.collect(rowSerialization.convert(record));
} catch (Exception e) {
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
index 539ea1ed7f..adf54eef4c 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.translation.flink.source;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
@@ -50,8 +52,11 @@ public class FlinkSource<SplitT extends SourceSplit,
EnumStateT extends Serializ
private final SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source;
- public FlinkSource(SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT>
source) {
+ private final Config envConfig;
+
+ public FlinkSource(SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT>
source, Config envConfig) {
this.source = source;
+ this.envConfig = envConfig;
}
@Override
@@ -70,7 +75,7 @@ public class FlinkSource<SplitT extends SourceSplit,
EnumStateT extends Serializ
org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT>
reader =
source.createReader(context);
return new FlinkSourceReader<>(
- reader, context, (SeaTunnelRowType) source.getProducedType());
+ reader, context, envConfig, (SeaTunnelRowType)
source.getProducedType());
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index 50ca18c93f..16b92fb920 100644
---
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.translation.flink.source;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -56,10 +58,11 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
public FlinkSourceReader(
org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT>
sourceReader,
org.apache.seatunnel.api.source.SourceReader.Context context,
+ Config envConfig,
SeaTunnelRowType seaTunnelRowType) {
this.sourceReader = sourceReader;
this.context = context;
- this.flinkRowCollector = new FlinkRowCollector(seaTunnelRowType);
+ this.flinkRowCollector = new FlinkRowCollector(seaTunnelRowType,
envConfig);
}
@Override
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
index 39782f174c..0bbcd3a6fe 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
@@ -29,8 +29,6 @@ import org.apache.spark.sql.catalyst.InternalRow;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-import static
org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy.getFlowControlStrategy;
-
public class InternalRowCollector implements Collector<SeaTunnelRow> {
private final Handover<InternalRow> handover;
private final Object checkpointLock;
@@ -50,19 +48,14 @@ public class InternalRowCollector implements
Collector<SeaTunnelRow> {
this.rowSerialization = new InternalRowConverter(dataType);
this.collectTotalCount = new AtomicLong(0);
this.envOptions = (Map) envOptionsInfo;
- FlowControlStrategy flowControlStrategy =
getFlowControlStrategy(envOptions);
- if (flowControlStrategy != null) {
- this.flowControlGate = FlowControlGate.create(flowControlStrategy);
- }
+ this.flowControlGate =
FlowControlGate.create(FlowControlStrategy.fromMap(envOptions));
}
@Override
public void collect(SeaTunnelRow record) {
try {
synchronized (checkpointLock) {
- if (flowControlGate != null) {
- flowControlGate.audit(record);
- }
+ flowControlGate.audit(record);
handover.produce(rowSerialization.convert(record));
}
collectTotalCount.incrementAndGet();