This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3add2126b6 [Chore][Core] Clean flow control code (#5991)
3add2126b6 is described below

commit 3add2126b69a3997a2d626c55d60c873ef01d3cc
Author: Tyrantlucifer <[email protected]>
AuthorDate: Tue Dec 12 15:17:11 2023 +0800

    [Chore][Core] Clean flow control code (#5991)
---
 .../core/starter/flowcontrol/FlowControlGate.java  |  24 +++--
 .../starter/flowcontrol/FlowControlStrategy.java   | 117 +++++++++++++--------
 .../flink/execution/SinkExecuteProcessor.java      |   7 +-
 .../FlinkAbstractPluginExecuteProcessor.java       |  17 +--
 .../starter/flink/execution/FlinkExecution.java    |   9 +-
 .../flink/execution/SinkExecuteProcessor.java      |   7 +-
 .../flink/execution/SourceExecuteProcessor.java    |  11 +-
 .../flink/execution/TransformExecuteProcessor.java |   7 +-
 .../server/task/SeaTunnelSourceCollector.java      |   8 +-
 .../engine/server/task/SourceSeaTunnelTask.java    |   5 +-
 .../flink/source/FlinkRowCollector.java            |  10 +-
 .../translation/flink/source/FlinkSource.java      |   9 +-
 .../flink/source/FlinkSourceReader.java            |   5 +-
 .../spark/serialization/InternalRowCollector.java  |  11 +-
 14 files changed, 154 insertions(+), 93 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
index 10b2301329..5d06366e3a 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlGate.java
@@ -21,19 +21,31 @@ import 
org.apache.seatunnel.shade.com.google.common.util.concurrent.RateLimiter;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
+import java.util.Optional;
+
 public class FlowControlGate {
 
-    private final RateLimiter bytesRateLimiter;
-    private final RateLimiter countRateLimiter;
+    private static final int DEFAULT_VALUE = Integer.MAX_VALUE;
+
+    private final Optional<RateLimiter> bytesRateLimiter;
+    private final Optional<RateLimiter> countRateLimiter;
 
     private FlowControlGate(FlowControlStrategy flowControlStrategy) {
-        this.bytesRateLimiter = 
RateLimiter.create(flowControlStrategy.getBytesPerSecond());
-        this.countRateLimiter = 
RateLimiter.create(flowControlStrategy.getCountPreSecond());
+        final int bytesPerSecond = flowControlStrategy.getBytesPerSecond();
+        final int countPreSecond = flowControlStrategy.getCountPreSecond();
+        this.bytesRateLimiter =
+                bytesPerSecond == DEFAULT_VALUE
+                        ? Optional.empty()
+                        : Optional.of(RateLimiter.create(bytesPerSecond));
+        this.countRateLimiter =
+                countPreSecond == DEFAULT_VALUE
+                        ? Optional.empty()
+                        : Optional.of(RateLimiter.create(countPreSecond));
     }
 
     public void audit(SeaTunnelRow row) {
-        bytesRateLimiter.acquire(row.getBytesSize());
-        countRateLimiter.acquire();
+        bytesRateLimiter.ifPresent(rateLimiter -> 
rateLimiter.acquire(row.getBytesSize()));
+        countRateLimiter.ifPresent(RateLimiter::acquire);
     }
 
     public static FlowControlGate create(FlowControlStrategy 
flowControlStrategy) {
diff --git 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
index 9ab0d041fd..923dccee0d 100644
--- 
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/flowcontrol/FlowControlStrategy.java
@@ -17,21 +17,21 @@
 
 package org.apache.seatunnel.core.starter.flowcontrol;
 
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-
-import lombok.Getter;
-import lombok.Setter;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.seatunnel.api.env.EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND;
+import static 
org.apache.seatunnel.api.env.EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND;
+
+public final class FlowControlStrategy {
 
-@Getter
-@Setter
-public class FlowControlStrategy {
+    private final int bytesPerSecond;
 
-    int bytesPerSecond;
-    int countPreSecond;
+    private final int countPreSecond;
 
-    public FlowControlStrategy(int bytesPerSecond, int countPreSecond) {
+    FlowControlStrategy(int bytesPerSecond, int countPreSecond) {
         if (bytesPerSecond <= 0 || countPreSecond <= 0) {
             throw new IllegalArgumentException(
                     "bytesPerSecond and countPreSecond must be positive");
@@ -40,53 +40,78 @@ public class FlowControlStrategy {
         this.countPreSecond = countPreSecond;
     }
 
+    public int getBytesPerSecond() {
+        return bytesPerSecond;
+    }
+
+    public int getCountPreSecond() {
+        return countPreSecond;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static class Builder {
+
+        private int bytesPerSecond = Integer.MAX_VALUE;
+
+        private int countPreSecond = Integer.MAX_VALUE;
+
+        private Builder() {}
+
+        public Builder bytesPerSecond(int bytesPerSecond) {
+            this.bytesPerSecond = bytesPerSecond;
+            return this;
+        }
+
+        public Builder countPerSecond(int countPreSecond) {
+            this.countPreSecond = countPreSecond;
+            return this;
+        }
+
+        public FlowControlStrategy build() {
+            return new FlowControlStrategy(bytesPerSecond, countPreSecond);
+        }
+    }
+
     public static FlowControlStrategy of(int bytesPerSecond, int 
countPreSecond) {
-        return new FlowControlStrategy(bytesPerSecond, countPreSecond);
+        return FlowControlStrategy.builder()
+                .bytesPerSecond(bytesPerSecond)
+                .countPerSecond(countPreSecond)
+                .build();
     }
 
     public static FlowControlStrategy ofBytes(int bytesPerSecond) {
-        return new FlowControlStrategy(bytesPerSecond, Integer.MAX_VALUE);
+        return 
FlowControlStrategy.builder().bytesPerSecond(bytesPerSecond).build();
     }
 
     public static FlowControlStrategy ofCount(int countPreSecond) {
-        return new FlowControlStrategy(Integer.MAX_VALUE, countPreSecond);
+        return 
FlowControlStrategy.builder().countPerSecond(countPreSecond).build();
     }
 
-    // Build the FlowControlStrategy object based on your configured speed 
limiting parameters
-    public static FlowControlStrategy getFlowControlStrategy(Map<String, 
Object> envOption) {
-        FlowControlStrategy strategy;
+    public static FlowControlStrategy fromMap(Map<String, Object> envOption) {
+        Builder builder = FlowControlStrategy.builder();
         if (envOption == null || envOption.isEmpty()) {
-            return null;
+            return builder.build();
+        }
+        final Object bytePerSecond = 
envOption.get(READ_LIMIT_BYTES_PER_SECOND.key());
+        final Object countPerSecond = 
envOption.get(READ_LIMIT_ROW_PER_SECOND.key());
+        Optional.ofNullable(bytePerSecond)
+                .ifPresent(bps -> 
builder.bytesPerSecond(Integer.parseInt(bps.toString())));
+        Optional.ofNullable(countPerSecond)
+                .ifPresent(cps -> 
builder.countPerSecond(Integer.parseInt(cps.toString())));
+        return builder.build();
+    }
+
+    public static FlowControlStrategy fromConfig(Config envConfig) {
+        Builder builder = FlowControlStrategy.builder();
+        if (envConfig.hasPath(READ_LIMIT_BYTES_PER_SECOND.key())) {
+            
builder.bytesPerSecond(envConfig.getInt(READ_LIMIT_BYTES_PER_SECOND.key()));
         }
-        if 
(envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
-                && 
envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
-            strategy =
-                    FlowControlStrategy.of(
-                            Integer.parseInt(
-                                    envOption
-                                            
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
-                                            .toString()),
-                            Integer.parseInt(
-                                    envOption
-                                            
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
-                                            .toString()));
-        } else if 
(envOption.containsKey(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())) {
-            strategy =
-                    FlowControlStrategy.ofBytes(
-                            Integer.parseInt(
-                                    envOption
-                                            
.get(EnvCommonOptions.READ_LIMIT_BYTES_PER_SECOND.key())
-                                            .toString()));
-        } else if 
(envOption.containsKey(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())) {
-            strategy =
-                    FlowControlStrategy.ofCount(
-                            Integer.parseInt(
-                                    envOption
-                                            
.get(EnvCommonOptions.READ_LIMIT_ROW_PER_SECOND.key())
-                                            .toString()));
-        } else {
-            strategy = null;
+        if (envConfig.hasPath(READ_LIMIT_ROW_PER_SECOND.key())) {
+            
builder.countPerSecond(envConfig.getInt(READ_LIMIT_ROW_PER_SECOND.key()));
         }
-        return strategy;
+        return builder.build();
     }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 862c0a5e7c..05fde53336 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -56,8 +56,11 @@ public class SinkExecuteProcessor
     private static final String PLUGIN_TYPE = PluginType.SINK.getType();
 
     protected SinkExecuteProcessor(
-            List<URL> jarPaths, List<? extends Config> pluginConfigs, 
JobContext jobContext) {
-        super(jarPaths, pluginConfigs, jobContext);
+            List<URL> jarPaths,
+            Config envConfig,
+            List<? extends Config> pluginConfigs,
+            JobContext jobContext) {
+        super(jarPaths, envConfig, pluginConfigs, jobContext);
     }
 
     @Override
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
index 683ff77449..6b72ed3a42 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
@@ -44,7 +44,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
     protected static final String ENGINE_TYPE = "seatunnel";
     protected static final String PLUGIN_NAME_KEY = "plugin_name";
     protected static final String SOURCE_TABLE_NAME = "source_table_name";
-    protected static HashMap<String, Boolean> isAppendMap = new HashMap<>();
+    protected static HashMap<String, Boolean> IS_APPEND_STREAM_MAP = new 
HashMap<>();
 
     protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER 
=
             (classLoader, url) -> {
@@ -64,12 +64,17 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
     protected final List<? extends Config> pluginConfigs;
     protected JobContext jobContext;
     protected final List<T> plugins;
+    protected final Config envConfig;
 
     protected FlinkAbstractPluginExecuteProcessor(
-            List<URL> jarPaths, List<? extends Config> pluginConfigs, 
JobContext jobContext) {
+            List<URL> jarPaths,
+            Config envConfig,
+            List<? extends Config> pluginConfigs,
+            JobContext jobContext) {
         this.pluginConfigs = pluginConfigs;
         this.jobContext = jobContext;
         this.plugins = initializePlugins(jarPaths, pluginConfigs);
+        this.envConfig = envConfig;
     }
 
     @Override
@@ -98,7 +103,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
                             TableUtil.tableToDataStream(
                                     tableEnvironment,
                                     table,
-                                    isAppendMap.getOrDefault(tableName, true)),
+                                    
IS_APPEND_STREAM_MAP.getOrDefault(tableName, true)),
                             dataStreamTableInfo.getCatalogTable(),
                             tableName));
         }
@@ -114,7 +119,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
                         pluginConfig,
                         dataStream,
                         resultTable,
-                        isAppendMap.getOrDefault(sourceTable, true));
+                        IS_APPEND_STREAM_MAP.getOrDefault(sourceTable, true));
                 registerAppendStream(pluginConfig);
                 return;
             }
@@ -122,14 +127,14 @@ public abstract class 
FlinkAbstractPluginExecuteProcessor<T>
                     pluginConfig,
                     dataStream,
                     resultTable,
-                    isAppendMap.getOrDefault(resultTable, true));
+                    IS_APPEND_STREAM_MAP.getOrDefault(resultTable, true));
         }
     }
 
     protected void registerAppendStream(Config pluginConfig) {
         if (pluginConfig.hasPath(RESULT_TABLE_NAME.key())) {
             String tableName = pluginConfig.getString(RESULT_TABLE_NAME.key());
-            isAppendMap.put(tableName, false);
+            IS_APPEND_STREAM_MAP.put(tableName, false);
         }
     }
 
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 3e079db3a3..39671af080 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -76,21 +76,24 @@ public class FlinkExecution implements TaskExecution {
         } catch (MalformedURLException e) {
             throw new SeaTunnelException("load flink starter error.", e);
         }
-        registerPlugin(config.getConfig("env"));
+        Config envConfig = config.getConfig("env");
+        registerPlugin(envConfig);
         JobContext jobContext = new JobContext();
         jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));
 
         this.sourcePluginExecuteProcessor =
-                new SourceExecuteProcessor(jarPaths, config, jobContext);
+                new SourceExecuteProcessor(
+                        jarPaths, envConfig, 
config.getConfigList(Constants.SOURCE), jobContext);
         this.transformPluginExecuteProcessor =
                 new TransformExecuteProcessor(
                         jarPaths,
+                        envConfig,
                         TypesafeConfigUtils.getConfigList(
                                 config, Constants.TRANSFORM, 
Collections.emptyList()),
                         jobContext);
         this.sinkPluginExecuteProcessor =
                 new SinkExecuteProcessor(
-                        jarPaths, config.getConfigList(Constants.SINK), 
jobContext);
+                        jarPaths, envConfig, 
config.getConfigList(Constants.SINK), jobContext);
 
         this.flinkRuntimeEnvironment =
                 
FlinkRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths));
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 6d0055b004..f4af78f81c 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -57,8 +57,11 @@ public class SinkExecuteProcessor
     private static final String PLUGIN_TYPE = PluginType.SINK.getType();
 
     protected SinkExecuteProcessor(
-            List<URL> jarPaths, List<? extends Config> pluginConfigs, 
JobContext jobContext) {
-        super(jarPaths, pluginConfigs, jobContext);
+            List<URL> jarPaths,
+            Config envConfig,
+            List<? extends Config> pluginConfigs,
+            JobContext jobContext) {
+        super(jarPaths, envConfig, pluginConfigs, jobContext);
     }
 
     @Override
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index df8ca15874..d017b1e204 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportCoordinate;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.core.starter.enums.PluginType;
 import org.apache.seatunnel.core.starter.execution.PluginUtil;
 import org.apache.seatunnel.core.starter.execution.SourceTableInfo;
@@ -54,8 +53,12 @@ import static 
org.apache.seatunnel.api.common.CommonOptions.RESULT_TABLE_NAME;
 public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<SourceTableInfo> {
     private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
 
-    public SourceExecuteProcessor(List<URL> jarPaths, Config ConfigsInfo, 
JobContext jobContext) {
-        super(jarPaths, ConfigsInfo.getConfigList(Constants.SOURCE), 
jobContext);
+    public SourceExecuteProcessor(
+            List<URL> jarPaths,
+            Config envConfig,
+            List<? extends Config> pluginConfigs,
+            JobContext jobContext) {
+        super(jarPaths, envConfig, pluginConfigs, jobContext);
     }
 
     @Override
@@ -70,7 +73,7 @@ public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<
             if (internalSource instanceof SupportCoordinate) {
                 registerAppendStream(pluginConfig);
             }
-            FlinkSource flinkSource = new FlinkSource<>(internalSource);
+            FlinkSource flinkSource = new FlinkSource<>(internalSource, 
envConfig);
 
             DataStreamSource sourceStream =
                     executionEnvironment.fromSource(
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index 6c145418b4..450599ff7b 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -50,8 +50,11 @@ public class TransformExecuteProcessor
         extends FlinkAbstractPluginExecuteProcessor<TableTransformFactory> {
 
     protected TransformExecuteProcessor(
-            List<URL> jarPaths, List<? extends Config> pluginConfigs, 
JobContext jobContext) {
-        super(jarPaths, pluginConfigs, jobContext);
+            List<URL> jarPaths,
+            Config envConfig,
+            List<? extends Config> pluginConfigs,
+            JobContext jobContext) {
+        super(jarPaths, envConfig, pluginConfigs, jobContext);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index aecb64ebfc..f5d4aed1ab 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -93,9 +93,7 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
         sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
         sourceReceivedBytes = metricsContext.counter(SOURCE_RECEIVED_BYTES);
         sourceReceivedBytesPerSeconds = 
metricsContext.meter(SOURCE_RECEIVED_BYTES_PER_SECONDS);
-        if (flowControlStrategy != null) {
-            flowControlGate = FlowControlGate.create(flowControlStrategy);
-        }
+        flowControlGate = FlowControlGate.create(flowControlStrategy);
     }
 
     @Override
@@ -116,9 +114,7 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
                 }
                 sourceReceivedBytes.inc(size);
                 sourceReceivedBytesPerSeconds.markEvent(size);
-                if (flowControlGate != null) {
-                    flowControlGate.audit((SeaTunnelRow) row);
-                }
+                flowControlGate.audit((SeaTunnelRow) row);
             }
             sendRecordToNext(new Record<>(row));
             emptyThisPollNext = false;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 7546d10a7b..53171d4031 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
 import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
 import 
org.apache.seatunnel.engine.server.dag.physical.flow.PhysicalExecutionFlow;
@@ -40,8 +41,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
-import static 
org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy.getFlowControlStrategy;
-
 public class SourceSeaTunnelTask<T, SplitT extends SourceSplit> extends 
SeaTunnelTask {
 
     private static final ILogger LOGGER = 
Logger.getLogger(SourceSeaTunnelTask.class);
@@ -90,7 +89,7 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
                             checkpointLock,
                             outputs,
                             this.getMetricsContext(),
-                            getFlowControlStrategy(envOption),
+                            FlowControlStrategy.fromMap(envOption),
                             sourceProducedType);
             ((SourceFlowLifeCycle<T, SplitT>) 
startFlowLifeCycle).setCollector(collector);
         }
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
index eb121f6926..b680e4b030 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkRowCollector.java
@@ -17,9 +17,13 @@
 
 package org.apache.seatunnel.translation.flink.source;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
+import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
 import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
 
 import org.apache.flink.api.connector.source.ReaderOutput;
@@ -35,12 +39,16 @@ public class FlinkRowCollector implements 
Collector<SeaTunnelRow> {
 
     private final FlinkRowConverter rowSerialization;
 
-    public FlinkRowCollector(SeaTunnelRowType seaTunnelRowType) {
+    private final FlowControlGate flowControlGate;
+
+    public FlinkRowCollector(SeaTunnelRowType seaTunnelRowType, Config 
envConfig) {
         this.rowSerialization = new FlinkRowConverter(seaTunnelRowType);
+        this.flowControlGate = 
FlowControlGate.create(FlowControlStrategy.fromConfig(envConfig));
     }
 
     @Override
     public void collect(SeaTunnelRow record) {
+        flowControlGate.audit(record);
         try {
             readerOutput.collect(rowSerialization.convert(record));
         } catch (Exception e) {
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
index 539ea1ed7f..adf54eef4c 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSource.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.translation.flink.source;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
@@ -50,8 +52,11 @@ public class FlinkSource<SplitT extends SourceSplit, 
EnumStateT extends Serializ
 
     private final SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source;
 
-    public FlinkSource(SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> 
source) {
+    private final Config envConfig;
+
+    public FlinkSource(SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> 
source, Config envConfig) {
         this.source = source;
+        this.envConfig = envConfig;
     }
 
     @Override
@@ -70,7 +75,7 @@ public class FlinkSource<SplitT extends SourceSplit, 
EnumStateT extends Serializ
         org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> 
reader =
                 source.createReader(context);
         return new FlinkSourceReader<>(
-                reader, context, (SeaTunnelRowType) source.getProducedType());
+                reader, context, envConfig, (SeaTunnelRowType) 
source.getProducedType());
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
index 50ca18c93f..16b92fb920 100644
--- 
a/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
+++ 
b/seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/FlinkSourceReader.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.translation.flink.source;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -56,10 +58,11 @@ public class FlinkSourceReader<SplitT extends SourceSplit>
     public FlinkSourceReader(
             org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> 
sourceReader,
             org.apache.seatunnel.api.source.SourceReader.Context context,
+            Config envConfig,
             SeaTunnelRowType seaTunnelRowType) {
         this.sourceReader = sourceReader;
         this.context = context;
-        this.flinkRowCollector = new FlinkRowCollector(seaTunnelRowType);
+        this.flinkRowCollector = new FlinkRowCollector(seaTunnelRowType, 
envConfig);
     }
 
     @Override
diff --git 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
index 39782f174c..0bbcd3a6fe 100644
--- 
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
+++ 
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/InternalRowCollector.java
@@ -29,8 +29,6 @@ import org.apache.spark.sql.catalyst.InternalRow;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static 
org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy.getFlowControlStrategy;
-
 public class InternalRowCollector implements Collector<SeaTunnelRow> {
     private final Handover<InternalRow> handover;
     private final Object checkpointLock;
@@ -50,19 +48,14 @@ public class InternalRowCollector implements 
Collector<SeaTunnelRow> {
         this.rowSerialization = new InternalRowConverter(dataType);
         this.collectTotalCount = new AtomicLong(0);
         this.envOptions = (Map) envOptionsInfo;
-        FlowControlStrategy flowControlStrategy = 
getFlowControlStrategy(envOptions);
-        if (flowControlStrategy != null) {
-            this.flowControlGate = FlowControlGate.create(flowControlStrategy);
-        }
+        this.flowControlGate = 
FlowControlGate.create(FlowControlStrategy.fromMap(envOptions));
     }
 
     @Override
     public void collect(SeaTunnelRow record) {
         try {
             synchronized (checkpointLock) {
-                if (flowControlGate != null) {
-                    flowControlGate.audit(record);
-                }
+                flowControlGate.audit(record);
                 handover.produce(rowSerialization.convert(record));
             }
             collectTotalCount.incrementAndGet();

Reply via email to