swuferhong commented on code in PR #23313:
URL: https://github.com/apache/flink/pull/23313#discussion_r1323892173


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala:
##########
@@ -395,9 +397,19 @@ class RexNodeToExpressionConverter(
     inputNames: Array[String],
     functionCatalog: FunctionCatalog,
     catalogManager: CatalogManager,
-    timeZone: TimeZone)
+    timeZone: TimeZone,
+    relDataType: Option[RelDataType] = None)
   extends RexVisitor[Option[ResolvedExpression]] {
 
+  def this(
+      rexBuilder: RexBuilder,
+      inputNames: Array[String],
+      functionCatalog: FunctionCatalog,
+      catalogManager: CatalogManager,
+      timeZone: TimeZone) = {
+    this(rexBuilder, inputNames, functionCatalog, catalogManager, timeZone, 
null)

Review Comment:
   There is no need to add `null`.  `this(rexBuilder, inputNames, 
functionCatalog, catalogManager, timeZone)` is ok.



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A reference to a nested field in an input. The reference contains.

Review Comment:
   `contains.` ->  `contains:`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/NestedFieldReferenceExpression.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * A reference to a nested field in an input. The reference contains.
+ *
+ * <ul>
+ *   <li>nested field names to traverse from the top level column to the 
nested leaf column.
+ *   <li>nested field indices to traverse from the top level column to the 
nested leaf column.
+ *   <li>type
+ * </ul>
+ */
+@PublicEvolving
+public class NestedFieldReferenceExpression implements ResolvedExpression {
+
+    /** Nested field names to traverse from the top level column to the nested 
leaf column. */
+    private final String[] fieldNames;
+
+    /** Nested field index to traverse from the top level column to the nested 
leaf column. */
+    private final int[] fieldIndices;
+
+    private final DataType dataType;
+
+    public NestedFieldReferenceExpression(
+            String[] fieldNames, int[] fieldIndices, DataType dataType) {
+        this.fieldNames = fieldNames;
+        this.fieldIndices = fieldIndices;
+        this.dataType = dataType;
+    }
+
+    public String[] getFieldNames() {
+        return fieldNames;
+    }
+
+    public int[] getFieldIndices() {
+        return fieldIndices;
+    }
+
+    public String getName() {
+        return String.format(
+                "`%s`",
+                String.join(
+                        ".",
+                        Arrays.stream(fieldNames)
+                                .map(this::quoteIdentifier)
+                                .toArray(String[]::new)));
+    }
+
+    @Override
+    public DataType getOutputDataType() {
+        return dataType;
+    }
+
+    @Override
+    public List<ResolvedExpression> getResolvedChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public String asSummaryString() {
+        return getName();
+    }
+
+    @Override
+    public List<Expression> getChildren() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public <R> R accept(ExpressionVisitor<R> visitor) {
+        return visitor.visit(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        NestedFieldReferenceExpression that = (NestedFieldReferenceExpression) 
o;
+        return Arrays.equals(fieldNames, that.fieldNames)
+                && Arrays.equals(fieldIndices, that.fieldIndices)
+                && dataType.equals(that.dataType);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(fieldNames, fieldIndices, dataType);
+    }

Review Comment:
   Idea warning: Change `fieldName` to `Arrays.hashCode(fieldName)`,  change 
`fieldIndices`  to `Arrays.hashCode(fieldIndices)`



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java:
##########
@@ -118,4 +151,34 @@ public void testWithInterval() {
         util.tableEnv().executeSql(ddl);
         super.testWithInterval();
     }
+
+    @Test
+    public void testBasicNestedFilter() {
+        util.verifyRelPlan("SELECT * FROM NestedTable WHERE 
deepNested.nested1.`value` > 2");
+    }
+
+    @Test
+    public void testNestedFilterWithDotInTheName() {
+        util.verifyRelPlan(
+                "SELECT id FROM NestedTable WHERE 
`deepNestedWith.`.nested.`.value` > 5");
+    }
+
+    @Test
+    public void testNestedFilterWithBacktickInTheName() {
+        util.verifyRelPlan(
+                "SELECT id FROM NestedTable WHERE 
`deepNestedWith.`.nested.```name` = 'foo'");

Review Comment:
   One small question, is this the standard way of adding "`"?  I couldn't find 
the relevant documents.



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java:
##########
@@ -86,6 +86,39 @@ public void setup() {
                         + ")";

Review Comment:
   Better to add some ITCase To verify the correctness of the data after push 
down



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to