Au-Miner commented on code in PR #26649: URL: https://github.com/apache/flink/pull/26649#discussion_r2149588701
########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/DeltaJoinTest.scala: ########## @@ -0,0 +1,574 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.planner.plan.batch.sql + +import org.apache.flink.table.api.{DataTypes, ExplainDetail, Schema} +import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions} +import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize +import org.apache.flink.table.api.config.OptimizerConfigOptions.DeltaJoinStrategy +import org.apache.flink.table.catalog.{CatalogTable, ObjectPath, ResolvedCatalogTable} +import org.apache.flink.table.planner.JMap +import org.apache.flink.table.planner.utils.{TableTestBase, TestingTableEnvironment} + +import org.assertj.core.api.Assertions.assertThatThrownBy +import org.junit.jupiter.api.{BeforeEach, Test} + +import java.util.{Collections, HashMap => JHashMap} + +/** Test for delta join. */ +class DeltaJoinTest extends TableTestBase { + + private val util = streamTestUtil() + private val tEnv: TestingTableEnvironment = util.tableEnv.asInstanceOf[TestingTableEnvironment] + + private val testComment = "test comment" + private val testValuesTableOptions: JMap[String, String] = { + val options = new JHashMap[String, String]() + options.put("connector", "values") + options.put("bounded", "false") + options.put("sink-insert-only", "false") + options.put("sink-changelog-mode-enforced", "I,UA,UB,D") + options.put("async", "true") + options + } + + @BeforeEach + def setup(): Unit = { + util.tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DELTA_JOIN_STRATEGY, + OptimizerConfigOptions.DeltaJoinStrategy.AUTO) + + addTable( + "src1", + Schema + .newBuilder() + .column("a0", DataTypes.INT.notNull) + .column("a1", DataTypes.DOUBLE.notNull) + .column("a2", DataTypes.STRING) + .column("a3", DataTypes.INT) + .primaryKey("a0") + .index("a1", "a2") + .build() + ) + + addTable( + "src2", + Schema + .newBuilder() + .column("b0", DataTypes.INT.notNull) + .column("b2", DataTypes.STRING) + .column("b1", DataTypes.DOUBLE) + .primaryKey("b0") + .index("b2") + .build() + ) + + addTable( + "snk", + Schema + .newBuilder() + .column("l0", DataTypes.INT.notNull) + .column("l1", DataTypes.DOUBLE) + .column("l2", DataTypes.STRING) + .column("l3", DataTypes.INT) + .column("r0", DataTypes.INT.notNull) + .column("r2", DataTypes.STRING) + .column("r1", DataTypes.DOUBLE) + .primaryKey("l0", "r0") + .build() + ) + } + + @Test + def testJoinKeysContainIndexOnBothSide(): Unit = { + util.verifyRelPlanInsert( + "insert into snk select * from src1 join src2 " + + "on src1.a1 = src2.b1 " + + "and src1.a2 = src2.b2") + } + + @Test + def testJoinKeysContainIndexOnBothSide2(): Unit = { + util.verifyRelPlanInsert( + "insert into snk select * from src1 join src2 " + + "on src1.a1 = src2.b1 " + + "and src1.a2 = src2.b2 " + + "and src1.a3 = src2.b0") + } + + @Test + def testJoinKeysNotContainIndexOnOneSide(): Unit = { + util.verifyRelPlanInsert( + "insert into snk select * from src1 join src2 " + + "on src1.a2 = src2.b2") + } + + @Test + def testWithNonEquiCondition1(): Unit = { + util.verifyRelPlanInsert( + "insert into snk select * from src1 join src2 " + + "on src1.a1 = src2.b1 " + + "and src1.a2 = src2.b2 " + + "and src2.b0 > src1.a0") + } + + @Test + def testWithNonEquiCondition2(): Unit = { + // could not optimize into delta join because there is a calc between join and source + util.verifyRelPlanInsert( + "insert into snk select * from src1 join src2 " + + "on src1.a1 = src2.b1 " + + "and src1.a2 = src2.b2 " + + "and src2.b0 > src1.a0 " + + "and src2.b2 <> 'Hello' " + + "and src1.a0 > 99") + } + + @Test + def testJsonPlanWithTableHints(): Unit = { + util.verifyJsonPlan( + "insert into snk select * from src1 /*+ OPTIONS('failing-source'='true') */" + + "join src2 /*+ OPTIONS('failing-source'='true') */" + + "on src1.a1 = src2.b1 " + + "and src1.a2 = src2.b2 " + + "and src2.b0 > src1.a0") + } + + @Test + def testProjectFieldsBeforeJoin(): Unit = { + // could not optimize into delta join because the source has ProjectPushDownSpec + util.verifyRelPlanInsert( + "insert into snk(l0, l1, l2, r0, r2, r1) " + + "select * from ( " + + " select a0, a1, a2 from src1" + + ") tmp join src2 " + + "on tmp.a1 = src2.b1 " + + "and tmp.a2 = src2.b2") + } + + @Test + def testProjectFieldsAfterJoin(): Unit = { + util.verifyRelPlanInsert( + "insert into snk select a0, a1 + 1.1, a2, a3, b0, b2, b1 from src1 join src2 " + + "on src1.a1 = src2.b1 " + + "and src1.a2 = src2.b2") + } + + @Test + def testFilterFieldsBeforeJoin(): Unit = { + // could not optimize into delta join because there is a calc between source and join + util.verifyRelPlanInsert( + "insert into snk select * from ( " + + " select * from src1 where a1 > 1.1 " + + ") tmp join src2 " + + "on tmp.a1 = src2.b1 " + + "and tmp.a2 = src2.b2") + } + + @Test + def testFilterFieldsAfterJoin(): Unit = { + util.verifyRelPlanInsert( + "insert into snk select * from src1 join src2 " + + "on src1.a1 = src2.b1 " + + "and src1.a2 = src2.b2 " + + "where a3 > b0") + } + + @Test + def testMultiRootsWithReusingJoinView(): Unit = { + util.tableConfig.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_OPTIMIZE_BLOCK_WITH_DIGEST_ENABLED, + Boolean.box(true)) + + util.tableEnv.executeSql("create table snk2 like snk") + + util.tableEnv.executeSql( + "create temporary view mv as " + + "select * from src1 join src2 " + + "on src1.a1 = src2.b1 " + + "and src1.a2 = src2.b2") + + val stmt = tEnv.createStatementSet() + stmt.addInsertSql("insert into snk select * from mv") + stmt.addInsertSql("insert into snk2 select * from mv") + + util.verifyRelPlan(stmt) Review Comment: Have Replaced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org