This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 20a328d80a1dbc50974cf3de9f4b6178246f6dee
Author: Jim Hughes <jhug...@confluent.io>
AuthorDate: Tue Dec 12 14:04:48 2023 -0500

    [FLINK-33767] Deleting TemporalJoinJsonPlanTest.java and 
TemporalJoinJsonPlanITCase.java
---
 .../exec/stream/TemporalJoinJsonPlanTest.java      | 101 -----
 .../jsonplan/TemporalJoinJsonPlanITCase.java       | 107 ------
 .../testJoinTemporalFunction.out                   | 421 ---------------------
 .../testTemporalTableJoin.json                     | 421 ---------------------
 .../testTemporalTableJoin.out                      | 421 ---------------------
 5 files changed, 1471 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java
deleted file mode 100644
index da3e6eaebae..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.plan.nodes.exec.stream;
-
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.functions.TemporalTableFunction;
-import org.apache.flink.table.planner.utils.StreamTableTestUtil;
-import org.apache.flink.table.planner.utils.TableTestBase;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import static org.apache.flink.table.api.Expressions.$;
-
-/** Test json serialization/deserialization for TemporalJoin. */
-class TemporalJoinJsonPlanTest extends TableTestBase {
-
-    private StreamTableTestUtil util;
-    private TableEnvironment tEnv;
-
-    @BeforeEach
-    void setup() {
-        util = streamTestUtil(TableConfig.getDefault());
-        tEnv = util.getTableEnv();
-
-        tEnv.executeSql(
-                "CREATE TABLE Orders (\n"
-                        + " amount INT,\n"
-                        + " currency STRING,\n"
-                        + " rowtime TIMESTAMP(3),\n"
-                        + " proctime AS PROCTIME(),\n"
-                        + " WATERMARK FOR rowtime AS rowtime\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values'\n"
-                        + ")");
-        tEnv.executeSql(
-                "CREATE TABLE RatesHistory (\n"
-                        + " currency STRING,\n"
-                        + " rate INT,\n"
-                        + " rowtime TIMESTAMP(3),\n"
-                        + " WATERMARK FOR rowtime AS rowtime,\n"
-                        + " PRIMARY KEY(currency) NOT ENFORCED\n"
-                        + ") WITH (\n"
-                        + " 'connector' = 'values'\n"
-                        + ")");
-        TemporalTableFunction ratesHistory =
-                
tEnv.from("RatesHistory").createTemporalTableFunction($("rowtime"), 
$("currency"));
-        tEnv.createTemporarySystemFunction("Rates", ratesHistory);
-    }
-
-    @Test
-    void testJoinTemporalFunction() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a int\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "INSERT INTO MySink "
-                        + "SELECT amount * r.rate "
-                        + "FROM Orders AS o,  "
-                        + "LATERAL TABLE (Rates(o.rowtime)) AS r "
-                        + "WHERE o.currency = r.currency ");
-    }
-
-    @Test
-    void testTemporalTableJoin() {
-        String sinkTableDdl =
-                "CREATE TABLE MySink (\n"
-                        + "  a int\n"
-                        + ") with (\n"
-                        + "  'connector' = 'values',\n"
-                        + "  'table-sink-class' = 'DEFAULT')";
-        tEnv.executeSql(sinkTableDdl);
-        util.verifyJsonPlan(
-                "INSERT INTO MySink "
-                        + "SELECT amount * r.rate "
-                        + "FROM Orders AS o  "
-                        + "JOIN RatesHistory  FOR SYSTEM_TIME AS OF o.rowtime 
AS r "
-                        + "ON o.currency = r.currency ");
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
deleted file mode 100644
index ca1ed35f050..00000000000
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/TemporalJoinJsonPlanITCase.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.runtime.stream.jsonplan;
-
-import org.apache.flink.table.functions.TemporalTableFunction;
-import org.apache.flink.table.planner.factories.TestValuesTableFactory;
-import org.apache.flink.table.planner.utils.JsonPlanTestBase;
-import org.apache.flink.types.Row;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.List;
-
-import static org.apache.flink.table.api.Expressions.$;
-
-/** Test for TemporalJoin json plan. */
-class TemporalJoinJsonPlanITCase extends JsonPlanTestBase {
-
-    @BeforeEach
-    @Override
-    protected void setup() throws Exception {
-        super.setup();
-        List<Row> orders =
-                Arrays.asList(
-                        Row.of(2L, "Euro", 2L),
-                        Row.of(1L, "US Dollar", 3L),
-                        Row.of(50L, "Yen", 4L),
-                        Row.of(3L, "Euro", 5L));
-        createTestValuesSourceTable(
-                "Orders",
-                orders,
-                "amount bigint",
-                "currency STRING",
-                "order_time bigint",
-                "rowtime as TO_TIMESTAMP(FROM_UNIXTIME(order_time)) ",
-                "proctime as PROCTIME()",
-                "WATERMARK FOR rowtime AS rowtime");
-        List<Row> ratesHistory =
-                Arrays.asList(
-                        Row.of("US Dollar", 102L, 1L),
-                        Row.of("Euro", 114L, 1L),
-                        Row.of("Yen", 1L, 1L),
-                        Row.of("Euro", 116L, 5L),
-                        Row.of("Euro", 119L, 7L));
-        createTestValuesSourceTable(
-                "RatesHistory",
-                ratesHistory,
-                "currency STRING",
-                "rate bigint",
-                "rate_time bigint",
-                "rowtime as TO_TIMESTAMP(FROM_UNIXTIME(rate_time)) ",
-                "proctime as PROCTIME()",
-                "WATERMARK FOR rowtime AS rowtime",
-                "PRIMARY KEY(currency) NOT ENFORCED");
-
-        TemporalTableFunction temporalTableFunction =
-                tableEnv.from("RatesHistory")
-                        .createTemporalTableFunction($("rowtime"), 
$("currency"));
-        tableEnv.createTemporarySystemFunction("Rates", temporalTableFunction);
-        createTestValuesSinkTable("MySink", "amount bigint");
-    }
-
-    /** test process time inner join. * */
-    @Test
-    void testJoinTemporalFunction() throws Exception {
-        compileSqlAndExecutePlan(
-                        "INSERT INTO MySink "
-                                + "SELECT amount * r.rate "
-                                + "FROM Orders AS o,  "
-                                + "LATERAL TABLE (Rates(o.rowtime)) AS r "
-                                + "WHERE o.currency = r.currency ")
-                .await();
-        List<String> expected = Arrays.asList("+I[102]", "+I[228]", "+I[348]", 
"+I[50]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-
-    @Test
-    void testTemporalTableJoin() throws Exception {
-        compileSqlAndExecutePlan(
-                        "INSERT INTO MySink "
-                                + "SELECT amount * r.rate "
-                                + "FROM Orders AS o  "
-                                + "JOIN RatesHistory  FOR SYSTEM_TIME AS OF 
o.rowtime AS r "
-                                + "ON o.currency = r.currency ")
-                .await();
-        List<String> expected = Arrays.asList("+I[102]", "+I[228]", "+I[348]", 
"+I[50]");
-        assertResult(expected, 
TestValuesTableFactory.getResultsAsStrings("MySink"));
-    }
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testJoinTemporalFunction.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testJoinTemporalFunction.out
deleted file mode 100644
index 9122979c553..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testJoinTemporalFunction.out
+++ /dev/null
@@ -1,421 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`Orders`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "amount",
-              "dataType" : "INT"
-            }, {
-              "name" : "currency",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "rowtime",
-              "dataType" : {
-                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-                "precision" : 3,
-                "kind" : "ROWTIME"
-              }
-            }, {
-              "name" : "proctime",
-              "kind" : "COMPUTED",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "CALL",
-                  "internalName" : "$PROCTIME$1",
-                  "operands" : [ ],
-                  "type" : {
-                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
-                    "nullable" : false,
-                    "precision" : 3,
-                    "kind" : "PROCTIME"
-                  }
-                },
-                "serializableString" : "PROCTIME()"
-              }
-            } ],
-            "watermarkSpecs" : [ {
-              "rowtimeAttribute" : "rowtime",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "INPUT_REF",
-                  "inputIndex" : 2,
-                  "type" : "TIMESTAMP(3)"
-                },
-                "serializableString" : "`rowtime`"
-              }
-            } ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`amount` INT, `currency` VARCHAR(2147483647), 
`rowtime` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, Orders]], fields=[amount, currency, rowtime])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-watermark-assigner_1",
-    "watermarkExpr" : {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "TIMESTAMP(3)"
-    },
-    "rowtimeFieldIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "amount",
-        "fieldType" : "INT"
-      }, {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "amount",
-        "fieldType" : "INT"
-      }, {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[currency]])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "currency",
-              "dataType" : "VARCHAR(2147483647) NOT NULL"
-            }, {
-              "name" : "rate",
-              "dataType" : "INT"
-            }, {
-              "name" : "rowtime",
-              "dataType" : {
-                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-                "precision" : 3,
-                "kind" : "ROWTIME"
-              }
-            } ],
-            "watermarkSpecs" : [ {
-              "rowtimeAttribute" : "rowtime",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "INPUT_REF",
-                  "inputIndex" : 2,
-                  "type" : "TIMESTAMP(3)"
-                },
-                "serializableString" : "`rowtime`"
-              }
-            } ],
-            "primaryKey" : {
-              "name" : "PK_currency",
-              "type" : "PRIMARY_KEY",
-              "columns" : [ "currency" ]
-            }
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` INT, 
`rowtime` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rowtime])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-watermark-assigner_1",
-    "watermarkExpr" : {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "TIMESTAMP(3)"
-    },
-    "rowtimeFieldIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647) NOT NULL"
-      }, {
-        "name" : "rate",
-        "fieldType" : "INT"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647) NOT NULL"
-      }, {
-        "name" : "rate",
-        "fieldType" : "INT"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[currency]])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-temporal-join_1",
-    "joinSpec" : {
-      "joinType" : "INNER",
-      "leftKeys" : [ 1 ],
-      "rightKeys" : [ 0 ],
-      "filterNulls" : [ true ],
-      "nonEquiCondition" : null
-    },
-    "isTemporalFunctionJoin" : true,
-    "leftTimeAttributeIndex" : 2,
-    "rightTimeAttributeIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    }, {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "amount",
-        "fieldType" : "INT"
-      }, {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      }, {
-        "name" : "currency0",
-        "fieldType" : "VARCHAR(2147483647) NOT NULL"
-      }, {
-        "name" : "rate",
-        "fieldType" : "INT"
-      }, {
-        "name" : "rowtime0",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "TemporalJoin(joinType=[InnerJoin], 
where=[(__TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0)) AND (currency = currency0))], 
select=[amount, currency, rowtime, currency0, rate, rowtime0])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "CALL",
-      "syntax" : "BINARY",
-      "internalName" : "$*$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "INT"
-      }, {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 4,
-        "type" : "INT"
-      } ],
-      "type" : "INT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`EXPR$0` INT>",
-    "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
-  }, {
-    "id" : 9,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "INT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`EXPR$0` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[EXPR$0])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 8,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json
deleted file mode 100644
index bbfd1524d3b..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.json
+++ /dev/null
@@ -1,421 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`Orders`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "amount",
-              "dataType" : "INT"
-            }, {
-              "name" : "currency",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "rowtime",
-              "dataType" : {
-                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-                "precision" : 3,
-                "kind" : "ROWTIME"
-              }
-            }, {
-              "name" : "proctime",
-              "kind" : "COMPUTED",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "CALL",
-                  "internalName" : "$PROCTIME$1",
-                  "operands" : [ ],
-                  "type" : {
-                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
-                    "nullable" : false,
-                    "precision" : 3,
-                    "kind" : "PROCTIME"
-                  }
-                },
-                "serializableString" : "PROCTIME()"
-              }
-            } ],
-            "watermarkSpecs" : [ {
-              "rowtimeAttribute" : "rowtime",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "INPUT_REF",
-                  "inputIndex" : 2,
-                  "type" : "TIMESTAMP(3)"
-                },
-                "serializableString" : "`rowtime`"
-              }
-            } ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`amount` INT, `currency` VARCHAR(2147483647), 
`rowtime` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, Orders]], fields=[amount, currency, rowtime])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-watermark-assigner_1",
-    "watermarkExpr" : {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "TIMESTAMP(3)"
-    },
-    "rowtimeFieldIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "amount",
-        "fieldType" : "INT"
-      }, {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "amount",
-        "fieldType" : "INT"
-      }, {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[currency]])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "currency",
-              "dataType" : "VARCHAR(2147483647) NOT NULL"
-            }, {
-              "name" : "rate",
-              "dataType" : "INT"
-            }, {
-              "name" : "rowtime",
-              "dataType" : {
-                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-                "precision" : 3,
-                "kind" : "ROWTIME"
-              }
-            } ],
-            "watermarkSpecs" : [ {
-              "rowtimeAttribute" : "rowtime",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "INPUT_REF",
-                  "inputIndex" : 2,
-                  "type" : "TIMESTAMP(3)"
-                },
-                "serializableString" : "`rowtime`"
-              }
-            } ],
-            "primaryKey" : {
-              "name" : "PK_currency",
-              "type" : "PRIMARY_KEY",
-              "columns" : [ "currency" ]
-            }
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` INT, 
`rowtime` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rowtime])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-watermark-assigner_1",
-    "watermarkExpr" : {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "TIMESTAMP(3)"
-    },
-    "rowtimeFieldIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647) NOT NULL"
-      }, {
-        "name" : "rate",
-        "fieldType" : "INT"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647) NOT NULL"
-      }, {
-        "name" : "rate",
-        "fieldType" : "INT"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[currency]])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-temporal-join_1",
-    "joinSpec" : {
-      "joinType" : "INNER",
-      "leftKeys" : [ 1 ],
-      "rightKeys" : [ 0 ],
-      "filterNulls" : [ true ],
-      "nonEquiCondition" : null
-    },
-    "isTemporalFunctionJoin" : false,
-    "leftTimeAttributeIndex" : 2,
-    "rightTimeAttributeIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    }, {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "amount",
-        "fieldType" : "INT"
-      }, {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      }, {
-        "name" : "currency0",
-        "fieldType" : "VARCHAR(2147483647) NOT NULL"
-      }, {
-        "name" : "rate",
-        "fieldType" : "INT"
-      }, {
-        "name" : "rowtime0",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "TemporalJoin(joinType=[InnerJoin], where=[((currency = 
currency0) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), 
__TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], 
select=[amount, currency, rowtime, currency0, rate, rowtime0])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "CALL",
-      "syntax" : "BINARY",
-      "internalName" : "$*$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "INT"
-      }, {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 4,
-        "type" : "INT"
-      } ],
-      "type" : "INT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`EXPR$0` INT>",
-    "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
-  }, {
-    "id" : 9,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "INT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`EXPR$0` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[EXPR$0])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 8,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.out
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.out
deleted file mode 100644
index bbfd1524d3b..00000000000
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/TemporalJoinJsonPlanTest_jsonplan/testTemporalTableJoin.out
+++ /dev/null
@@ -1,421 +0,0 @@
-{
-  "flinkVersion" : "",
-  "nodes" : [ {
-    "id" : 1,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`Orders`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "amount",
-              "dataType" : "INT"
-            }, {
-              "name" : "currency",
-              "dataType" : "VARCHAR(2147483647)"
-            }, {
-              "name" : "rowtime",
-              "dataType" : {
-                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-                "precision" : 3,
-                "kind" : "ROWTIME"
-              }
-            }, {
-              "name" : "proctime",
-              "kind" : "COMPUTED",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "CALL",
-                  "internalName" : "$PROCTIME$1",
-                  "operands" : [ ],
-                  "type" : {
-                    "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
-                    "nullable" : false,
-                    "precision" : 3,
-                    "kind" : "PROCTIME"
-                  }
-                },
-                "serializableString" : "PROCTIME()"
-              }
-            } ],
-            "watermarkSpecs" : [ {
-              "rowtimeAttribute" : "rowtime",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "INPUT_REF",
-                  "inputIndex" : 2,
-                  "type" : "TIMESTAMP(3)"
-                },
-                "serializableString" : "`rowtime`"
-              }
-            } ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`amount` INT, `currency` VARCHAR(2147483647), 
`rowtime` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, Orders]], fields=[amount, currency, rowtime])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 2,
-    "type" : "stream-exec-watermark-assigner_1",
-    "watermarkExpr" : {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "TIMESTAMP(3)"
-    },
-    "rowtimeFieldIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "amount",
-        "fieldType" : "INT"
-      }, {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
-  }, {
-    "id" : 3,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 1 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "amount",
-        "fieldType" : "INT"
-      }, {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[currency]])"
-  }, {
-    "id" : 4,
-    "type" : "stream-exec-table-source-scan_1",
-    "scanTableSource" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`RatesHistory`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "currency",
-              "dataType" : "VARCHAR(2147483647) NOT NULL"
-            }, {
-              "name" : "rate",
-              "dataType" : "INT"
-            }, {
-              "name" : "rowtime",
-              "dataType" : {
-                "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-                "precision" : 3,
-                "kind" : "ROWTIME"
-              }
-            } ],
-            "watermarkSpecs" : [ {
-              "rowtimeAttribute" : "rowtime",
-              "expression" : {
-                "rexNode" : {
-                  "kind" : "INPUT_REF",
-                  "inputIndex" : 2,
-                  "type" : "TIMESTAMP(3)"
-                },
-                "serializableString" : "`rowtime`"
-              }
-            } ],
-            "primaryKey" : {
-              "name" : "PK_currency",
-              "type" : "PRIMARY_KEY",
-              "columns" : [ "currency" ]
-            }
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values"
-          }
-        }
-      }
-    },
-    "outputType" : "ROW<`currency` VARCHAR(2147483647) NOT NULL, `rate` INT, 
`rowtime` TIMESTAMP(3)>",
-    "description" : "TableSourceScan(table=[[default_catalog, 
default_database, RatesHistory]], fields=[currency, rate, rowtime])",
-    "inputProperties" : [ ]
-  }, {
-    "id" : 5,
-    "type" : "stream-exec-watermark-assigner_1",
-    "watermarkExpr" : {
-      "kind" : "INPUT_REF",
-      "inputIndex" : 2,
-      "type" : "TIMESTAMP(3)"
-    },
-    "rowtimeFieldIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647) NOT NULL"
-      }, {
-        "name" : "rate",
-        "fieldType" : "INT"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[rowtime])"
-  }, {
-    "id" : 6,
-    "type" : "stream-exec-exchange_1",
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "HASH",
-        "keys" : [ 0 ]
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647) NOT NULL"
-      }, {
-        "name" : "rate",
-        "fieldType" : "INT"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "Exchange(distribution=[hash[currency]])"
-  }, {
-    "id" : 7,
-    "type" : "stream-exec-temporal-join_1",
-    "joinSpec" : {
-      "joinType" : "INNER",
-      "leftKeys" : [ 1 ],
-      "rightKeys" : [ 0 ],
-      "filterNulls" : [ true ],
-      "nonEquiCondition" : null
-    },
-    "isTemporalFunctionJoin" : false,
-    "leftTimeAttributeIndex" : 2,
-    "rightTimeAttributeIndex" : 2,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    }, {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : {
-      "type" : "ROW",
-      "fields" : [ {
-        "name" : "amount",
-        "fieldType" : "INT"
-      }, {
-        "name" : "currency",
-        "fieldType" : "VARCHAR(2147483647)"
-      }, {
-        "name" : "rowtime",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      }, {
-        "name" : "currency0",
-        "fieldType" : "VARCHAR(2147483647) NOT NULL"
-      }, {
-        "name" : "rate",
-        "fieldType" : "INT"
-      }, {
-        "name" : "rowtime0",
-        "fieldType" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
-          "precision" : 3,
-          "kind" : "ROWTIME"
-        }
-      } ]
-    },
-    "description" : "TemporalJoin(joinType=[InnerJoin], where=[((currency = 
currency0) AND __TEMPORAL_JOIN_CONDITION(rowtime, rowtime0, 
__TEMPORAL_JOIN_CONDITION_PRIMARY_KEY(currency0), 
__TEMPORAL_JOIN_LEFT_KEY(currency), __TEMPORAL_JOIN_RIGHT_KEY(currency0)))], 
select=[amount, currency, rowtime, currency0, rate, rowtime0])"
-  }, {
-    "id" : 8,
-    "type" : "stream-exec-calc_1",
-    "projection" : [ {
-      "kind" : "CALL",
-      "syntax" : "BINARY",
-      "internalName" : "$*$1",
-      "operands" : [ {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 0,
-        "type" : "INT"
-      }, {
-        "kind" : "INPUT_REF",
-        "inputIndex" : 4,
-        "type" : "INT"
-      } ],
-      "type" : "INT"
-    } ],
-    "condition" : null,
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`EXPR$0` INT>",
-    "description" : "Calc(select=[(amount * rate) AS EXPR$0])"
-  }, {
-    "id" : 9,
-    "type" : "stream-exec-sink_1",
-    "configuration" : {
-      "table.exec.sink.keyed-shuffle" : "AUTO",
-      "table.exec.sink.not-null-enforcer" : "ERROR",
-      "table.exec.sink.rowtime-inserter" : "ENABLED",
-      "table.exec.sink.type-length-enforcer" : "IGNORE",
-      "table.exec.sink.upsert-materialize" : "AUTO"
-    },
-    "dynamicTableSink" : {
-      "table" : {
-        "identifier" : "`default_catalog`.`default_database`.`MySink`",
-        "resolvedTable" : {
-          "schema" : {
-            "columns" : [ {
-              "name" : "a",
-              "dataType" : "INT"
-            } ],
-            "watermarkSpecs" : [ ]
-          },
-          "partitionKeys" : [ ],
-          "options" : {
-            "connector" : "values",
-            "table-sink-class" : "DEFAULT"
-          }
-        }
-      }
-    },
-    "inputChangelogMode" : [ "INSERT" ],
-    "inputProperties" : [ {
-      "requiredDistribution" : {
-        "type" : "UNKNOWN"
-      },
-      "damBehavior" : "PIPELINED",
-      "priority" : 0
-    } ],
-    "outputType" : "ROW<`EXPR$0` INT>",
-    "description" : "Sink(table=[default_catalog.default_database.MySink], 
fields=[EXPR$0])"
-  } ],
-  "edges" : [ {
-    "source" : 1,
-    "target" : 2,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 2,
-    "target" : 3,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 4,
-    "target" : 5,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 5,
-    "target" : 6,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 3,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 6,
-    "target" : 7,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 7,
-    "target" : 8,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  }, {
-    "source" : 8,
-    "target" : 9,
-    "shuffle" : {
-      "type" : "FORWARD"
-    },
-    "shuffleMode" : "PIPELINED"
-  } ]
-}


Reply via email to