Hisoka-X commented on code in PR #5509:
URL: https://github.com/apache/seatunnel/pull/5509#discussion_r1330865359


##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java:
##########
@@ -52,10 +53,11 @@
 
 public class SourceExecuteProcessor extends 
FlinkAbstractPluginExecuteProcessor<SeaTunnelSource> {
     private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
+    private Config envConfis;
 
-    public SourceExecuteProcessor(
-            List<URL> jarPaths, List<? extends Config> sourceConfigs, 
JobContext jobContext) {
-        super(jarPaths, sourceConfigs, jobContext);
+    public SourceExecuteProcessor(List<URL> jarPaths, Config ConfigsInfo, 
JobContext jobContext) {

Review Comment:
   ```suggestion
       public SourceExecuteProcessor(List<URL> jarPaths, Config configsInfo, 
JobContext jobContext) {
   ```



##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java:
##########
@@ -63,12 +68,25 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
 
     protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
     protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
+    // Env Conf Info
+    private Config envConfis;
+    // Store env for external Settings
+    private Map<String, Object> envOption = new HashMap<>();
 
     /** Flag indicating whether the consumer is still running. */
     private volatile boolean running = true;
 
-    public BaseSeaTunnelSourceFunction(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source) {
+    public BaseSeaTunnelSourceFunction(
+            SeaTunnelSource<SeaTunnelRow, ?, ?> source, Config 
sourceEnvConfis) {

Review Comment:
   ditto



##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java:
##########
@@ -47,7 +47,7 @@ public class SourceSeaTunnelTask<T, SplitT extends 
SourceSplit> extends SeaTunne
 
     private transient Object checkpointLock;
     @Getter private transient Serializer<SplitT> splitSerializer;
-    private final Map<String, Object> envOption;
+    private final transient Map<String, Object> envOption;

Review Comment:
   Any reason for add `transient`?



##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java:
##########
@@ -68,10 +70,11 @@ public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams)
             Config pluginConfig = pluginConfigs.get(i);
             BaseSeaTunnelSourceFunction sourceFunction;
             if (internalSource instanceof SupportCoordinate) {
-                sourceFunction = new 
SeaTunnelCoordinatedSource(internalSource);
+                sourceFunction = new 
SeaTunnelCoordinatedSource(internalSource, envConfis);

Review Comment:
   ```suggestion
                   sourceFunction = new 
SeaTunnelCoordinatedSource(internalSource, envConfigs);
   ```



##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java:
##########
@@ -27,9 +29,10 @@ public class SeaTunnelCoordinatedSource extends 
BaseSeaTunnelSourceFunction {
 
     protected static final String COORDINATED_SOURCE_STATE_NAME = 
"coordinated-source-states";
 
-    public SeaTunnelCoordinatedSource(SeaTunnelSource<SeaTunnelRow, ?, ?> 
source) {
+    public SeaTunnelCoordinatedSource(
+            SeaTunnelSource<SeaTunnelRow, ?, ?> source, Config envConfis) {
         // TODO: Make sure the source is coordinated.
-        super(source);
+        super(source, envConfis);

Review Comment:
   ditto



##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java:
##########
@@ -63,12 +68,25 @@ public abstract class BaseSeaTunnelSourceFunction extends 
RichSourceFunction<Row
 
     protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
     protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
+    // Env Conf Info
+    private Config envConfis;

Review Comment:
   ditto



##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java:
##########
@@ -31,9 +33,9 @@ public class SeaTunnelParallelSource extends 
BaseSeaTunnelSourceFunction
 
     protected static final String PARALLEL_SOURCE_STATE_NAME = 
"parallel-source-states";
 
-    public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source) 
{
+    public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source, 
Config envConfis) {
         // TODO: Make sure the source is uncoordinated.
-        super(source);
+        super(source, envConfis);

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to