swuferhong commented on code in PR #23316:
URL: https://github.com/apache/flink/pull/23316#discussion_r1308686777
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala:
##########
@@ -239,4 +239,62 @@ object FunctionCodeGenerator {
new GeneratedJoinCondition(funcName, funcCode, ctx.references.toArray,
ctx.tableConfig)
}
+
+ /**
+ * Generates a [[FilterCondition]] that can be passed to Java compiler.
+ *
+ * @param ctx
+ * The context of the code generator
+ * @param name
+ * Class name of the Function. Not must be unique but has to be a valid
Java class identifier.
+ * @param bodyCode
+ * code contents of the SAM (Single Abstract Method).
+ * @param inputTerm
+ * the input term
+ * @return
+ * instance of GeneratedFilterCondition
+ */
+ def generateFilterCondition(
+ ctx: CodeGeneratorContext,
+ name: String,
+ bodyCode: String,
+ inputTerm: String = CodeGenUtils.DEFAULT_INPUT_TERM):
GeneratedFilterCondition = {
Review Comment:
I think maybe we can abstract one `generateCondition` method, let
`GenerateJoinCondtition` and `generateFilterCondition` extend it? their code
only a small difference.
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java:
##########
@@ -244,4 +244,29 @@ public void testJoinTemporalTableWithAsyncRetryHint2() {
+ "FROM MyTable AS T JOIN LookupTable "
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a =
D.id");
}
+
+ @Test
+ public void testLeftJoinTemporalTableWithPreFilter() {
+ util.verifyJsonPlan(
+ "INSERT INTO MySink1 SELECT * "
+ + "FROM MyTable AS T LEFT JOIN LookupTable "
+ + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
AND T.b > 'abc'");
+ }
+
+ @Test
+ public void testLeftJoinTemporalTableWithPostFilter() {
+ util.verifyJsonPlan(
+ "INSERT INTO MySink1 SELECT * "
+ + "FROM MyTable AS T LEFT JOIN LookupTable "
+ + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
AND T.b > 'abc'");
+ }
Review Comment:
Two tests with the same sql pattern.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFilterCondition.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Describes a generated {@link FilterCondition}. */
+public class GeneratedFilterCondition extends
GeneratedFunction<FilterCondition> {
+
+ private static final long serialVersionUID = 1L;
+
+ @VisibleForTesting
+ public GeneratedFilterCondition(String className, String code, Object[]
references) {
+ super(className, code, references, new Configuration());
+ }
+
Review Comment:
Adding harness tests to cover this constructor method like
`String2SortMergeJoinOperatorTest`.
##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFilterCondition.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.generated;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ReadableConfig;
+
+/** Describes a generated {@link FilterCondition}. */
+public class GeneratedFilterCondition extends
GeneratedFunction<FilterCondition> {
+
+ private static final long serialVersionUID = 1L;
Review Comment:
Change `serialVersionUID` to `2L` as other `GeneratedXXX` do.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala:
##########
@@ -100,7 +100,8 @@ class StreamPhysicalLookupJoin(
new StreamExecLookupJoin(
tableConfig,
JoinTypeUtil.getFlinkJoinType(joinType),
- remainingCondition.orNull,
+ finalPreFilterCondition.orNull,
+ finalRemainingCondition.orNull,
Review Comment:
Why are `finalPreFilterCondition` and `finalRemainingCondition` both
`orNull`, but in `CommonExexcLookupJoin`, only `preFilterCondition` with json
include `NON_NULL`, but `remainingJoinCondition` without.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java:
##########
@@ -175,9 +177,14 @@ public abstract class CommonExecLookupJoin extends
ExecNodeBase<RowData> {
@JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE)
private final @Nullable RexNode filterOnTemporalTable;
- /** join condition except equi-conditions extracted as lookup keys. */
- @JsonProperty(FIELD_NAME_JOIN_CONDITION)
- private final @Nullable RexNode joinCondition;
+ /** pre-filter condition on left input except lookup keys. */
+ @JsonProperty(FIELD_NAME_PRE_FILTER_CONDITION)
+ @JsonInclude(JsonInclude.Include.NON_NULL)
Review Comment:
Why need add this `JsonInclude`? Add Itcase & Ut case to cover it.
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java:
##########
@@ -301,8 +301,10 @@ public StreamPhysicalRel visit(
// required determinism cannot be satisfied even upsert
materialize was enabled if:
// 1. remaining join condition contains non-deterministic call
- JavaScalaConversionUtil.toJava(lookupJoin.remainingCondition())
- .ifPresent(condi ->
checkNonDeterministicCondition(condi, lookupJoin));
+
JavaScalaConversionUtil.toJava(lookupJoin.finalPreFilterCondition())
+ .ifPresent(cond ->
checkNonDeterministicCondition(cond, lookupJoin));
Review Comment:
Adding tests in `NoneDeterministicDagTest.scala` to cover this?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -98,11 +98,11 @@ abstract class CommonPhysicalLookupJoin(
// all potential index keys, mapping from field index in table source to
LookupKey
analyzeLookupKeys(cluster.getRexBuilder, joinKeyPairs, calcOnTemporalTable)
}
- // remaining condition used to filter the joined records (left input record
X lookup-ed records)
- val remainingCondition: Option[RexNode] = getRemainingJoinCondition(
+ // split remaining condition into pre-filter(used to filter the left input
before lookup) and
+ // remaining parts(used to filter the joined records)
+ val (finalPreFilterCondition, finalRemainingCondition) =
splitRemainingJoinCondition(
Review Comment:
Maybe we can rename this method. `splitRemainingJoinCondition` looks like
split the `finalRemainingCondition` instead of split condition into pre-filter
and remaining.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala:
##########
@@ -244,9 +243,31 @@ abstract class CommonPhysicalLookupJoin(
val rightInputRef = new RexInputRef(rightIndex, rightFieldType)
rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, leftInputRef,
rightInputRef)
}
- val remainingAnds = remainingEquals ++ joinInfo.nonEquiConditions.asScala
- // build a new condition
- val condition = RexUtil.composeConjunction(rexBuilder,
remainingAnds.toList.asJava)
+ if (joinType.generatesNullsOnRight) {
+ // only extract pre-filter for left & full outer joins(otherwise the
pre-filter will always be pushed down)
+ val (leftLocal, remaining) =
+ joinInfo.nonEquiConditions.asScala.partition {
+ r =>
+ {
+ val inputRefs = new InputRefVisitor()
+ r.accept(inputRefs)
+ // if all input refs must from left
Review Comment:
Rewrite this comment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]