Airblader commented on a change in pull request #17118:
URL: https://github.com/apache/flink/pull/17118#discussion_r701252602



##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
##########
@@ -116,6 +118,9 @@ protected RexNode createRemainingCondition(
                 convertExpressionToRexNode(result.getAcceptedFilters(), 
relBuilder);
         FilterPushDownSpec filterPushDownSpec = new 
FilterPushDownSpec(acceptedPredicates);
 
+        ArrayList<String> acceptedExprStrs = new ArrayList<>();

Review comment:
       Use `final List` instead of the implementation for the declared type.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
##########
@@ -56,4 +57,12 @@
      */
     @JsonIgnore
     Optional<RowType> getProducedType();
+
+    /**
+     * When Filter,Limit,Partition,etc. are pushed into {@link 
DynamicTableSource}, the source will

Review comment:
       This description breaks the abstraction because it references specific 
implementations. How about this?
   
   > Additional digests to generate when this spec is applied to the source.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.java
##########
@@ -82,7 +86,8 @@ protected FlinkLogicalTableSourceScan getNewScan(
             FlinkLogicalTableSourceScan scan,
             TableConfig tableConfig,
             boolean useWatermarkAssignerRowType) {
-        String digest = String.format("watermark=[%s]", watermarkExpr);
+
+        Optional<List<String>> watermarkExprStrs;

Review comment:
       Can be final

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
##########
@@ -56,4 +57,12 @@
      */
     @JsonIgnore
     Optional<RowType> getProducedType();
+
+    /**
+     * When Filter,Limit,Partition,etc. are pushed into {@link 
DynamicTableSource}, the source will
+     * add a digest to record this behavior.
+     *
+     * @param exprStrs The expressions need to be added into the extra digest.
+     */
+    String getExtraDigests(Optional<List<String>> exprStrs);

Review comment:
       I think this should return an array or a list. As it stands now, we're 
pulling the formatting concern into this method that was previously separated.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushWatermarkIntoTableSourceScanRuleBase.java
##########
@@ -109,18 +114,23 @@ protected FlinkLogicalTableSourceScan getNewScan(
                     new SourceWatermarkSpec(true, producedType);
             sourceWatermarkSpec.apply(newDynamicTableSource, abilityContext);
             abilitySpec = sourceWatermarkSpec;
+            watermarkExprStrs = Optional.empty();
         } else {
+            ArrayList<String> exprStrs = new ArrayList<>();

Review comment:
       Use `final List`

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoSourceScanRuleBase.java
##########
@@ -179,23 +186,22 @@ protected FlinkStatistic getNewFlinkStatistic(
         return newStatistic;
     }
 
-    protected String[] getNewExtraDigests(List<ResolvedExpression> 
acceptedFilters) {
-        final String extraDigest;
+    protected String convertExpressionToString(List<ResolvedExpression> 
acceptedFilters) {
+        final String pushedExpr;
         if (!acceptedFilters.isEmpty()) {

Review comment:
       This `if` isn't necessary, we can also reduce over an empty list.

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java
##########
@@ -56,4 +57,12 @@
      */
     @JsonIgnore
     Optional<RowType> getProducedType();
+
+    /**
+     * When Filter,Limit,Partition,etc. are pushed into {@link 
DynamicTableSource}, the source will
+     * add a digest to record this behavior.
+     *
+     * @param exprStrs The expressions need to be added into the extra digest.
+     */
+    String getExtraDigests(Optional<List<String>> exprStrs);

Review comment:
       Having to pass an argument seems like a violation of the intention of 
the change. If the spec is responsible for generating the digest, it shouldn't 
need additional information to do so. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to