lsyldliu commented on code in PR #23282:
URL: https://github.com/apache/flink/pull/23282#discussion_r1309575673


##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -53,45 +52,41 @@ public class BatchExecTableSourceScan extends 
CommonExecTableSourceScan
         implements BatchExecNode<RowData> {
 
     // Avoids creating different ids if translated multiple times
-    private final String dynamicFilteringDataListenerID = 
UUID.randomUUID().toString();
+    @Nullable private final String dynamicFilteringDataListenerID;

Review Comment:
   If we don't make this change, it also can work, so I think we can keep 
original code here, I don't recommend exposing the 
dynamicFilteringDataListenerID to the developer.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -53,45 +52,41 @@ public class BatchExecTableSourceScan extends 
CommonExecTableSourceScan
         implements BatchExecNode<RowData> {
 
     // Avoids creating different ids if translated multiple times
-    private final String dynamicFilteringDataListenerID = 
UUID.randomUUID().toString();
+    @Nullable private final String dynamicFilteringDataListenerID;
 
-    private boolean needDynamicFilteringDependency;
+    private final ReadableConfig tableConfig;
 
     // This constructor can be used only when table source scan has
     // BatchExecDynamicFilteringDataCollector input
     public BatchExecTableSourceScan(
             ReadableConfig tableConfig,
             DynamicTableSourceSpec tableSourceSpec,
-            InputProperty inputProperty,
+            List<InputProperty> inputProperties,
             RowType outputType,
-            String description) {
+            String description,
+            @Nullable String dynamicFilteringDataListenerID) {
         super(
                 ExecNodeContext.newNodeId(),
                 ExecNodeContext.newContext(BatchExecTableSourceScan.class),
                 
ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
                 tableSourceSpec,
-                Collections.singletonList(inputProperty),
+                inputProperties,
                 outputType,
                 description);
+        this.dynamicFilteringDataListenerID = dynamicFilteringDataListenerID;
+        this.tableConfig = tableConfig;
     }
 
     public BatchExecTableSourceScan(
             ReadableConfig tableConfig,
             DynamicTableSourceSpec tableSourceSpec,
             RowType outputType,
             String description) {
-        super(
-                ExecNodeContext.newNodeId(),
-                ExecNodeContext.newContext(BatchExecTableSourceScan.class),
-                
ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
-                tableSourceSpec,
-                Collections.emptyList(),
-                outputType,
-                description);
+        this(tableConfig, tableSourceSpec, Collections.emptyList(), 
outputType, description, null);

Review Comment:
   I think the original constructor can work for our needs, just needs to do a 
small enforcement.
   ```
       public BatchExecTableSourceScan(
               ReadableConfig tableConfig,
               DynamicTableSourceSpec tableSourceSpec,
               RowType outputType,
               String description) {
           super(
                   ExecNodeContext.newNodeId(),
                   ExecNodeContext.newContext(BatchExecTableSourceScan.class),
                   
ExecNodeContext.newPersistedConfig(BatchExecTableSourceScan.class, tableConfig),
                   tableSourceSpec,
                   Collections.emptyList(),
                   outputType,
                   description);
           this.tableConfig = tableConfig;
       }
   ```



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecTableSourceScan.java:
##########
@@ -177,4 +132,17 @@ public Transformation<RowData> 
createInputFormatTransformation(
                 new InputFormatSourceFunction<>(inputFormat, outputTypeInfo);
         return env.addSource(function, operatorName, 
outputTypeInfo).getTransformation();
     }
+
+    public BatchExecTableSourceScan copyAndRemoveInputs() {
+        BatchExecTableSourceScan tableSourceScan =

Review Comment:
   We can reuse the contructor `BatchExecTableSourceScan(
               ReadableConfig tableConfig,
               DynamicTableSourceSpec tableSourceSpec,
               RowType outputType,
               String description)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to