lincoln-lil commented on code in PR #23316: URL: https://github.com/apache/flink/pull/23316#discussion_r1308826720
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/generated/GeneratedFilterCondition.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.generated; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ReadableConfig; + +/** Describes a generated {@link FilterCondition}. */ +public class GeneratedFilterCondition extends GeneratedFunction<FilterCondition> { + + private static final long serialVersionUID = 1L; Review Comment: As Flink code guideline, all new serializable class should contain the uid start from 1L, the existed `GeneratedXXX`s have the number 2L because they have been changed and increased. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java: ########## @@ -301,8 +301,10 @@ public StreamPhysicalRel visit( // required determinism cannot be satisfied even upsert materialize was enabled if: // 1. remaining join condition contains non-deterministic call - JavaScalaConversionUtil.toJava(lookupJoin.remainingCondition()) - .ifPresent(condi -> checkNonDeterministicCondition(condi, lookupJoin)); + JavaScalaConversionUtil.toJava(lookupJoin.finalPreFilterCondition()) + .ifPresent(cond -> checkNonDeterministicCondition(cond, lookupJoin)); Review Comment: will add a case testCdcLeftJoinDimWithNonDeterministicPreFilter to cover this ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCodeGenerator.scala: ########## @@ -239,4 +239,62 @@ object FunctionCodeGenerator { new GeneratedJoinCondition(funcName, funcCode, ctx.references.toArray, ctx.tableConfig) } + + /** + * Generates a [[FilterCondition]] that can be passed to Java compiler. + * + * @param ctx + * The context of the code generator + * @param name + * Class name of the Function. Not must be unique but has to be a valid Java class identifier. + * @param bodyCode + * code contents of the SAM (Single Abstract Method). + * @param inputTerm + * the input term + * @return + * instance of GeneratedFilterCondition + */ + def generateFilterCondition( + ctx: CodeGeneratorContext, + name: String, + bodyCode: String, + inputTerm: String = CodeGenUtils.DEFAULT_INPUT_TERM): GeneratedFilterCondition = { Review Comment: Make sense, the function class can reused. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalLookupJoin.scala: ########## @@ -100,7 +100,8 @@ class StreamPhysicalLookupJoin( new StreamExecLookupJoin( tableConfig, JoinTypeUtil.getFlinkJoinType(joinType), - remainingCondition.orNull, + finalPreFilterCondition.orNull, + finalRemainingCondition.orNull, Review Comment: Consider the compatibility, I didn't change the original `finalRemainingCondition` (also add a `NON_NULL` json annotation), but the newly added one (`finalPreFilterCondition`) can be tagged safely. ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/common/CommonPhysicalLookupJoin.scala: ########## @@ -98,11 +98,11 @@ abstract class CommonPhysicalLookupJoin( // all potential index keys, mapping from field index in table source to LookupKey analyzeLookupKeys(cluster.getRexBuilder, joinKeyPairs, calcOnTemporalTable) } - // remaining condition used to filter the joined records (left input record X lookup-ed records) - val remainingCondition: Option[RexNode] = getRemainingJoinCondition( + // split remaining condition into pre-filter(used to filter the left input before lookup) and + // remaining parts(used to filter the joined records) + val (finalPreFilterCondition, finalRemainingCondition) = splitRemainingJoinCondition( Review Comment: Just simplified to `splitJoinCondition`? ########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/LookupJoinJsonPlanTest.java: ########## @@ -244,4 +244,29 @@ public void testJoinTemporalTableWithAsyncRetryHint2() { + "FROM MyTable AS T JOIN LookupTable " + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id"); } + + @Test + public void testLeftJoinTemporalTableWithPreFilter() { + util.verifyJsonPlan( + "INSERT INTO MySink1 SELECT * " + + "FROM MyTable AS T LEFT JOIN LookupTable " + + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b > 'abc'"); + } + + @Test + public void testLeftJoinTemporalTableWithPostFilter() { + util.verifyJsonPlan( + "INSERT INTO MySink1 SELECT * " + + "FROM MyTable AS T LEFT JOIN LookupTable " + + "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id AND T.b > 'abc'"); + } Review Comment: Good catch! ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecLookupJoin.java: ########## @@ -175,9 +177,14 @@ public abstract class CommonExecLookupJoin extends ExecNodeBase<RowData> { @JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE) private final @Nullable RexNode filterOnTemporalTable; - /** join condition except equi-conditions extracted as lookup keys. */ - @JsonProperty(FIELD_NAME_JOIN_CONDITION) - private final @Nullable RexNode joinCondition; + /** pre-filter condition on left input except lookup keys. */ + @JsonProperty(FIELD_NAME_PRE_FILTER_CONDITION) + @JsonInclude(JsonInclude.Include.NON_NULL) Review Comment: pre-filter condition is a nullable attribute, we can omit it in the json plan which will not affect the serialization & deserialization. This has been covered by the `LookupJoinJsonPlanTest`, agree you that add another case into `LookupJoinJsonPlanITCase` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
