Hisoka-X commented on code in PR #5509:
URL: https://github.com/apache/seatunnel/pull/5509#discussion_r1330865359
##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java:
##########
@@ -52,10 +53,11 @@
public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<SeaTunnelSource> {
private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
+ private Config envConfis;
- public SourceExecuteProcessor(
- List<URL> jarPaths, List<? extends Config> sourceConfigs,
JobContext jobContext) {
- super(jarPaths, sourceConfigs, jobContext);
+ public SourceExecuteProcessor(List<URL> jarPaths, Config ConfigsInfo,
JobContext jobContext) {
Review Comment:
```suggestion
public SourceExecuteProcessor(List<URL> jarPaths, Config configsInfo,
JobContext jobContext) {
```
##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java:
##########
@@ -63,12 +68,25 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
+ // Env Conf Info
+ private Config envConfis;
+ // Store env for external Settings
+ private Map<String, Object> envOption = new HashMap<>();
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;
- public BaseSeaTunnelSourceFunction(SeaTunnelSource<SeaTunnelRow, ?, ?>
source) {
+ public BaseSeaTunnelSourceFunction(
+ SeaTunnelSource<SeaTunnelRow, ?, ?> source, Config
sourceEnvConfis) {
Review Comment:
ditto
##########
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java:
##########
@@ -47,7 +47,7 @@ public class SourceSeaTunnelTask<T, SplitT extends
SourceSplit> extends SeaTunne
private transient Object checkpointLock;
@Getter private transient Serializer<SplitT> splitSerializer;
- private final Map<String, Object> envOption;
+ private final transient Map<String, Object> envOption;
Review Comment:
Any reason for add `transient`?
##########
seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java:
##########
@@ -68,10 +70,11 @@ public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams)
Config pluginConfig = pluginConfigs.get(i);
BaseSeaTunnelSourceFunction sourceFunction;
if (internalSource instanceof SupportCoordinate) {
- sourceFunction = new
SeaTunnelCoordinatedSource(internalSource);
+ sourceFunction = new
SeaTunnelCoordinatedSource(internalSource, envConfis);
Review Comment:
```suggestion
sourceFunction = new
SeaTunnelCoordinatedSource(internalSource, envConfigs);
```
##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelCoordinatedSource.java:
##########
@@ -27,9 +29,10 @@ public class SeaTunnelCoordinatedSource extends
BaseSeaTunnelSourceFunction {
protected static final String COORDINATED_SOURCE_STATE_NAME =
"coordinated-source-states";
- public SeaTunnelCoordinatedSource(SeaTunnelSource<SeaTunnelRow, ?, ?>
source) {
+ public SeaTunnelCoordinatedSource(
+ SeaTunnelSource<SeaTunnelRow, ?, ?> source, Config envConfis) {
// TODO: Make sure the source is coordinated.
- super(source);
+ super(source, envConfis);
Review Comment:
ditto
##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/BaseSeaTunnelSourceFunction.java:
##########
@@ -63,12 +68,25 @@ public abstract class BaseSeaTunnelSourceFunction extends
RichSourceFunction<Row
protected final AtomicLong latestCompletedCheckpointId = new AtomicLong(0);
protected final AtomicLong latestTriggerCheckpointId = new AtomicLong(0);
+ // Env Conf Info
+ private Config envConfis;
Review Comment:
ditto
##########
seatunnel-translation/seatunnel-translation-flink/seatunnel-translation-flink-common/src/main/java/org/apache/seatunnel/translation/flink/source/SeaTunnelParallelSource.java:
##########
@@ -31,9 +33,9 @@ public class SeaTunnelParallelSource extends
BaseSeaTunnelSourceFunction
protected static final String PARALLEL_SOURCE_STATE_NAME =
"parallel-source-states";
- public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source)
{
+ public SeaTunnelParallelSource(SeaTunnelSource<SeaTunnelRow, ?, ?> source,
Config envConfis) {
// TODO: Make sure the source is uncoordinated.
- super(source);
+ super(source, envConfis);
Review Comment:
ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]