gong commented on code in PR #7590:
URL: https://github.com/apache/inlong/pull/7590#discussion_r1135329061
##########
inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/AllMigrateTest.java:
##########
@@ -132,14 +149,43 @@ public void testAllMigrate() throws Exception {
Node inputNode = buildAllMigrateExtractNode();
Node outputNode = buildAllMigrateKafkaNode();
StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
-
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
Collections.singletonList(outputNode))));
GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
ParseResult result = parser.parse();
Assert.assertTrue(result.tryExecute());
}
+ /**
+ * Test all migrate with two input nodes and one output node (two
relations)
+ *
+ * @throws Exception The exception may throws when execute the case
+ */
+ @Test
+ public void testAllMigrateMultiRelations() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildAllMigrateExtractNode();
+ Node inputNode2 = buildAllMigrateExtractNode2();
+ Node outputNode = buildAllMigrateKafkaNode();
Review Comment:
kafka node id need change to 3
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]