This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e87f416c7 [INLONG-3831][Sort] Add meta field support for sort
lightweight (#3985)
e87f416c7 is described below
commit e87f416c763906cd14aaa78de50d207d91bbbd36
Author: yunqingmoswu <[email protected]>
AuthorDate: Thu Apr 28 10:03:55 2022 +0800
[INLONG-3831][Sort] Add meta field support for sort lightweight (#3985)
---
.../inlong/sort/protocol/BuiltInFieldInfo.java | 50 +++++-
.../flink/parser/impl/FlinkSqlParser.java | 170 +++++++++++++++++--
.../flink/parser/MetaFieldSyncTest.java | 179 +++++++++++++++++++++
3 files changed, 389 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
index d73c9b243..81568a8ca 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/BuiltInFieldInfo.java
@@ -71,13 +71,61 @@ public class BuiltInFieldInfo extends FieldInfo {
}
public enum BuiltInField {
+ /**
+ * The event time of flink
+ */
DATA_TIME,
+ /**
+ * The process time of flink
+ */
PROCESS_TIME,
+ /**
+ * The name of the database containing this Row
+ */
MYSQL_METADATA_DATABASE,
+ /**
+ * The name of the table containing this Row
+ */
MYSQL_METADATA_TABLE,
+ /**
+ * The time when the Row made changes in the database
+ */
MYSQL_METADATA_EVENT_TIME,
+ /**
+ * Whether the DDL statement
+ */
MYSQL_METADATA_IS_DDL,
+ /**
+ * Type of database operation, such as INSERT/DELETE, etc.
+ */
MYSQL_METADATA_EVENT_TYPE,
- MYSQL_METADATA_DATA
+ /**
+ * MySQL binlog data Row
+ */
+ MYSQL_METADATA_DATA,
+ /**
+ * The value of the field before update
+ */
+ METADATA_UPDATE_BEFORE,
+ /**
+ * Batch id of binlog
+ */
+ METADATA_BATCH_ID,
+ /**
+ * Mapping of sql_type table fields to java data type IDs
+ */
+ METADATA_SQL_TYPE,
+ /**
+ * The current time when the ROW was received and processed
+ */
+ METADATA_TS,
+ /**
+ * The table structure
+ */
+ METADATA_MYSQL_TYPE,
+ /**
+ * Primary key field name
+ */
+ METADATA_PK_NAMES
}
}
diff --git
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
index d60282df0..29c24e404 100644
---
a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
+++
b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/parser/impl/FlinkSqlParser.java
@@ -22,12 +22,16 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.GroupInfo;
import org.apache.inlong.sort.protocol.StreamInfo;
import org.apache.inlong.sort.protocol.node.ExtractNode;
import org.apache.inlong.sort.protocol.node.LoadNode;
import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.KafkaExtractNode;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.transform.DistinctNode;
import org.apache.inlong.sort.protocol.node.transform.TransformNode;
import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
@@ -497,7 +501,7 @@ public class FlinkSqlParser implements Parser {
StringBuilder sb = new StringBuilder("CREATE TABLE `");
sb.append(node.genTableName()).append("`(\n");
sb.append(genPrimaryKey(node.getPrimaryKey()));
- sb.append(parseFields(node.getFields(), node instanceof LoadNode));
+ sb.append(parseFields(node.getFields(), node));
if (node instanceof ExtractNode) {
ExtractNode extractNode = (ExtractNode) node;
if (extractNode.getWatermarkField() != null) {
@@ -566,21 +570,16 @@ public class FlinkSqlParser implements Parser {
* Parse fields
*
* @param fields The fields defined in node
- * @param isLoad Where is load or not
+ * @param node The abstract of extract, transform, load
* @return Field format in select sql
*/
- private String parseFields(List<FieldInfo> fields, boolean isLoad) {
+ private String parseFields(List<FieldInfo> fields, Node node) {
StringBuilder sb = new StringBuilder();
for (FieldInfo field : fields) {
sb.append(" `").append(field.getName()).append("` ");
if (field instanceof BuiltInFieldInfo) {
BuiltInFieldInfo builtInFieldInfo = (BuiltInFieldInfo) field;
- switch (builtInFieldInfo.getBuiltInField()) {
- case PROCESS_TIME:
- sb.append(" AS PROCTIME()");
- break;
- default:
- }
+ parseMetaField(node, builtInFieldInfo, sb);
} else {
sb.append(TableFormatUtils.deriveLogicalType(field.getFormatInfo()).asSerializableString());
}
@@ -592,6 +591,159 @@ public class FlinkSqlParser implements Parser {
return sb.toString();
}
+ private void parseMetaField(Node node, BuiltInFieldInfo metaField,
StringBuilder sb) {
+ if (metaField.getBuiltInField() == BuiltInField.PROCESS_TIME) {
+ sb.append(" AS PROCTIME()");
+ return;
+ }
+ if (node instanceof MySqlExtractNode) {
+ sb.append(parseMySqlExtractNodeMetaField(metaField));
+ } else if (node instanceof KafkaExtractNode) {
+ sb.append(parseKafkaExtractNodeMetaField(metaField));
+ } else if (node instanceof KafkaLoadNode) {
+ sb.append(parseKafkaLoadNodeMetaField(metaField));
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("This node:%s does not currently support
metadata fields",
+ node.getClass().getName()));
+ }
+ }
+
+ private String parseKafkaLoadNodeMetaField(BuiltInFieldInfo metaField) {
+ String metaType;
+ switch (metaField.getBuiltInField()) {
+ case MYSQL_METADATA_TABLE:
+ metaType = "STRING METADATA FROM 'value.table'";
+ break;
+ case MYSQL_METADATA_DATABASE:
+ metaType = "STRING METADATA FROM 'value.database'";
+ break;
+ case MYSQL_METADATA_EVENT_TIME:
+ metaType = "TIMESTAMP(3) METADATA FROM 'value.op_ts'";
+ break;
+ case MYSQL_METADATA_EVENT_TYPE:
+ metaType = "STRING METADATA FROM 'value.op_type'";
+ break;
+ case MYSQL_METADATA_DATA:
+ metaType = "STRING METADATA FROM 'value.data'";
+ break;
+ case MYSQL_METADATA_IS_DDL:
+ metaType = "BOOLEAN METADATA FROM 'value.is_ddl'";
+ break;
+ case METADATA_TS:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ts'";
+ break;
+ case METADATA_SQL_TYPE:
+ metaType = "MAP<STRING, INT> METADATA FROM 'value.sql_type'";
+ break;
+ case METADATA_MYSQL_TYPE:
+ metaType = "MAP<STRING, STRING> METADATA FROM
'value.mysql_type'";
+ break;
+ case METADATA_PK_NAMES:
+ metaType = "ARRAY<STRING> METADATA FROM 'value.pk_names'";
+ break;
+ case METADATA_BATCH_ID:
+ metaType = "BIGINT METADATA FROM 'value.batch_id'";
+ break;
+ case METADATA_UPDATE_BEFORE:
+ metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM
'value.update_before'";
+ break;
+ default:
+ metaType =
TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSerializableString();
+ }
+ return metaType;
+ }
+
+ private String parseKafkaExtractNodeMetaField(BuiltInFieldInfo metaField) {
+ String metaType;
+ switch (metaField.getBuiltInField()) {
+ case MYSQL_METADATA_TABLE:
+ metaType = "STRING METADATA FROM 'value.table'";
+ break;
+ case MYSQL_METADATA_DATABASE:
+ metaType = "STRING METADATA FROM 'value.database'";
+ break;
+ case MYSQL_METADATA_EVENT_TIME:
+ metaType = "TIMESTAMP(3) METADATA FROM 'value.op_ts'";
+ break;
+ case MYSQL_METADATA_EVENT_TYPE:
+ metaType = "STRING METADATA FROM 'value.op_type'";
+ break;
+ case MYSQL_METADATA_DATA:
+ metaType = "STRING METADATA FROM 'value.data'";
+ break;
+ case MYSQL_METADATA_IS_DDL:
+ metaType = "BOOLEAN METADATA FROM 'value.is_ddl'";
+ break;
+ case METADATA_TS:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'value.ts'";
+ break;
+ case METADATA_SQL_TYPE:
+ metaType = "MAP<STRING, INT> METADATA FROM 'value.sql_type'";
+ break;
+ case METADATA_MYSQL_TYPE:
+ metaType = "MAP<STRING, STRING> METADATA FROM
'value.mysql_type'";
+ break;
+ case METADATA_PK_NAMES:
+ metaType = "ARRAY<STRING> METADATA FROM 'value.pk_names'";
+ break;
+ case METADATA_BATCH_ID:
+ metaType = "BIGINT METADATA FROM 'value.batch_id'";
+ break;
+ case METADATA_UPDATE_BEFORE:
+ metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM
'value.update_before'";
+ break;
+ default:
+ metaType =
TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSerializableString();
+ }
+ return metaType;
+ }
+
+ private String parseMySqlExtractNodeMetaField(BuiltInFieldInfo metaField) {
+ String metaType;
+ switch (metaField.getBuiltInField()) {
+ case MYSQL_METADATA_TABLE:
+ metaType = "STRING METADATA FROM 'meta.table_name' VIRTUAL";
+ break;
+ case MYSQL_METADATA_DATABASE:
+ metaType = "STRING METADATA FROM 'meta.database_name' VIRTUAL";
+ break;
+ case MYSQL_METADATA_EVENT_TIME:
+ metaType = "TIMESTAMP(3) METADATA FROM 'meta.op_ts' VIRTUAL";
+ break;
+ case MYSQL_METADATA_EVENT_TYPE:
+ metaType = "STRING METADATA FROM 'meta.op_type' VIRTUAL";
+ break;
+ case MYSQL_METADATA_DATA:
+ metaType = "STRING METADATA FROM 'meta.data' VIRTUAL";
+ break;
+ case MYSQL_METADATA_IS_DDL:
+ metaType = "BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL";
+ break;
+ case METADATA_TS:
+ metaType = "TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL";
+ break;
+ case METADATA_SQL_TYPE:
+ metaType = "MAP<STRING, INT> METADATA FROM 'meta.sql_type'
VIRTUAL";
+ break;
+ case METADATA_MYSQL_TYPE:
+ metaType = "MAP<STRING, STRING> METADATA FROM
'meta.mysql_type' VIRTUAL";
+ break;
+ case METADATA_PK_NAMES:
+ metaType = "ARRAY<STRING> METADATA FROM 'meta.pk_names'
VIRTUAL";
+ break;
+ case METADATA_BATCH_ID:
+ metaType = "BIGINT METADATA FROM 'meta.batch_id' VIRTUAL";
+ break;
+ case METADATA_UPDATE_BEFORE:
+ metaType = "ARRAY<MAP<STRING, STRING>> METADATA FROM
'meta.update_before' VIRTUAL";
+ break;
+ default:
+ metaType =
TableFormatUtils.deriveLogicalType(metaField.getFormatInfo()).asSerializableString();
+ }
+ return metaType;
+ }
+
/**
* Generate primary key format in sql
*
diff --git
a/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java
b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java
new file mode 100644
index 000000000..8e45babf7
--- /dev/null
+++
b/inlong-sort/sort-single-tenant/src/test/java/org/apache/inlong/sort/singletenant/flink/parser/MetaFieldSyncTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.parser;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
+import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.MapFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
+import org.apache.inlong.sort.protocol.BuiltInFieldInfo.BuiltInField;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelationShip;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
+import
org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+import
org.apache.inlong.sort.protocol.transformation.relation.NodeRelationShip;
+import org.apache.inlong.sort.singletenant.flink.parser.impl.FlinkSqlParser;
+import
org.apache.inlong.sort.singletenant.flink.parser.result.FlinkSqlParseResult;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Test for {@link FlinkSqlParser#parseMetaField(Node, BuiltInFieldInfo,
StringBuilder)}
+ */
+public class MetaFieldSyncTest extends AbstractTestBase {
+
+ private MySqlExtractNode buildMySQLExtractNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()),
+ new BuiltInFieldInfo("database", new TimestampFormatInfo(),
BuiltInField.MYSQL_METADATA_DATABASE),
+ new BuiltInFieldInfo("table", new TimestampFormatInfo(),
BuiltInField.MYSQL_METADATA_TABLE),
+ new BuiltInFieldInfo("pk_names", new TimestampFormatInfo(),
BuiltInField.METADATA_PK_NAMES),
+ new BuiltInFieldInfo("event_time", new TimestampFormatInfo(),
BuiltInField.MYSQL_METADATA_EVENT_TIME),
+ new BuiltInFieldInfo("event_type", new TimestampFormatInfo(),
BuiltInField.MYSQL_METADATA_EVENT_TYPE),
+ new BuiltInFieldInfo("isddl", new TimestampFormatInfo(),
BuiltInField.MYSQL_METADATA_IS_DDL),
+ new BuiltInFieldInfo("batch_id", new TimestampFormatInfo(),
BuiltInField.METADATA_BATCH_ID),
+ new BuiltInFieldInfo("mysql_type", new TimestampFormatInfo(),
BuiltInField.METADATA_MYSQL_TYPE),
+ new BuiltInFieldInfo("sql_type", new TimestampFormatInfo(),
BuiltInField.METADATA_SQL_TYPE),
+ new BuiltInFieldInfo("meta_ts", new TimestampFormatInfo(),
BuiltInField.METADATA_TS),
+ new BuiltInFieldInfo("up_before", new TimestampFormatInfo(),
BuiltInField.METADATA_UPDATE_BEFORE)
+ );
+ return new MySqlExtractNode("1", "mysql_input", fields, null, null,
"id",
+ Arrays.asList("worker", "worker2"), "localhost", "root",
"168998",
+ "wedata_datastudio", null, null, null, null);
+ }
+
+ private KafkaLoadNode buildKafkaNode() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldInfo("database", new StringFormatInfo()),
+ new FieldInfo("table", new StringFormatInfo()),
+ new FieldInfo("pk_names", new ArrayFormatInfo(new
StringFormatInfo())),
+ new FieldInfo("event_time", new TimestampFormatInfo()),
+ new FieldInfo("event_type", new StringFormatInfo()),
+ new FieldInfo("isddl", new BooleanFormatInfo()),
+ new FieldInfo("batch_id", new LongFormatInfo()),
+ new FieldInfo("mysql_type", new MapFormatInfo(new
StringFormatInfo(), new StringFormatInfo())),
+ new FieldInfo("sql_type", new MapFormatInfo(new
StringFormatInfo(), new IntFormatInfo())),
+ new FieldInfo("meta_ts", new TimestampFormatInfo()),
+ new FieldInfo("up_before",
+ new ArrayFormatInfo(new MapFormatInfo(new
StringFormatInfo(), new StringFormatInfo())))
+ );
+ List<FieldRelationShip> relations = Arrays
+ .asList(new FieldRelationShip(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelationShip(new FieldInfo("name", new
StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelationShip(new FieldInfo("age", new
IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelationShip(new FieldInfo("ts", new
TimestampFormatInfo()),
+ new FieldInfo("ts", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("database", new
TimestampFormatInfo()),
+ new FieldInfo("database", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("table", new
TimestampFormatInfo()),
+ new FieldInfo("table", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("pk_names", new
TimestampFormatInfo()),
+ new FieldInfo("pk_names", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("event_time", new
TimestampFormatInfo()),
+ new FieldInfo("event_time", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("event_type", new
TimestampFormatInfo()),
+ new FieldInfo("event_type", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("isddl", new
TimestampFormatInfo()),
+ new FieldInfo("isddl", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("batch_id", new
TimestampFormatInfo()),
+ new FieldInfo("batch_id", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("mysql_type", new
TimestampFormatInfo()),
+ new FieldInfo("mysql_type", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("sql_type", new
TimestampFormatInfo()),
+ new FieldInfo("sql_type", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("meta_ts", new
TimestampFormatInfo()),
+ new FieldInfo("meta_ts", new
TimestampFormatInfo())),
+ new FieldRelationShip(new FieldInfo("up_before", new
TimestampFormatInfo()),
+ new FieldInfo("up_before", new
TimestampFormatInfo()))
+ );
+ List<FilterFunction> filters = Arrays.asList(new
SingleValueFilterFunction(EmptyOperator.getInstance(),
+ new FieldInfo("name", new StringFormatInfo()),
+ EqualOperator.getInstance(), new
StringConstantParam("yunqingmo")));
+ KafkaLoadNode node = new KafkaLoadNode("2", "kafka_output2", fields,
relations,
+ null, "worker123", "localhost:9092",
+ new JsonFormat(), null,
+ null, "id");
+ return node;
+ }
+
+ public NodeRelationShip buildNodeRelation(List<Node> inputs, List<Node>
outputs) {
+ List<String> inputIds =
inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds =
outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelationShip(inputIds, outputIds);
+ }
+
+ /**
+ * Test meta field sync test
+ *
+ * @throws Exception The exception may throws when execute the case
+ */
+ @Test
+ public void testMetaFieldSyncTest() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildMySQLExtractNode();
+ Node outputNode = buildKafkaNode();
+ StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("1",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ FlinkSqlParseResult result = parser.parse();
+ Assert.assertTrue(result.tryExecute());
+ }
+}