This is an automated email from the ASF dual-hosted git repository.
pacinogong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4c6a61b45 [INLONG-6784][Sort] Support apache hudi LoadNode (#6789)
4c6a61b45 is described below
commit 4c6a61b45b29aa49658a86d8799600538f86156e
Author: averyzhang <[email protected]>
AuthorDate: Fri Dec 9 19:52:54 2022 +0800
[INLONG-6784][Sort] Support apache hudi LoadNode (#6789)
---
.../apache/inlong/sort/protocol/node/LoadNode.java | 5 +-
.../org/apache/inlong/sort/protocol/node/Node.java | 3 +
.../sort/protocol/node/load/HudiLoadNode.java | 174 +++++++++++++++++++
.../sort/protocol/node/load/HudiLoadNodeTest.java | 57 +++++++
.../inlong/sort/parser/HudiNodeSqlParserTest.java | 186 +++++++++++++++++++++
5 files changed, 424 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
index 72cc42931..da8859e8c 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java
@@ -36,6 +36,7 @@ import
org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
@@ -71,7 +72,9 @@ import java.util.Map;
@JsonSubTypes.Type(value = GreenplumLoadNode.class, name =
"greenplumLoad"),
@JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name =
"dlcIcebergLoad"),
@JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
- @JsonSubTypes.Type(value = StarRocksLoadNode.class, name =
"starRocksLoad")
+ @JsonSubTypes.Type(value = StarRocksLoadNode.class, name =
"starRocksLoad"),
+ @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
+ @JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"),
})
@NoArgsConstructor
@Data
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
index 7f37075e0..087a5a9a7 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/Node.java
@@ -41,6 +41,7 @@ import
org.apache.inlong.sort.protocol.node.load.FileSystemLoadNode;
import org.apache.inlong.sort.protocol.node.load.GreenplumLoadNode;
import org.apache.inlong.sort.protocol.node.load.HbaseLoadNode;
import org.apache.inlong.sort.protocol.node.load.HiveLoadNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
import org.apache.inlong.sort.protocol.node.load.IcebergLoadNode;
import org.apache.inlong.sort.protocol.node.load.KafkaLoadNode;
import org.apache.inlong.sort.protocol.node.load.MySqlLoadNode;
@@ -90,6 +91,8 @@ import java.util.TreeMap;
@JsonSubTypes.Type(value = GreenplumLoadNode.class, name =
"greenplumLoad"),
@JsonSubTypes.Type(value = DLCIcebergLoadNode.class, name =
"dlcIcebergLoad"),
@JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
+ @JsonSubTypes.Type(value = HudiLoadNode.class, name = "hudiLoad"),
+ @JsonSubTypes.Type(value = DorisLoadNode.class, name = "dorisLoad"),
@JsonSubTypes.Type(value = StarRocksLoadNode.class, name =
"starRocksLoad"),
})
public interface Node {
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
new file mode 100644
index 000000000..86306dea0
--- /dev/null
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNode.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.InlongMetric;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.enums.FilterStrategy;
+import org.apache.inlong.sort.protocol.node.LoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+
+@JsonTypeName("hudiLoad")
+@Data
+@NoArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class HudiLoadNode extends LoadNode implements InlongMetric,
Serializable {
+
+ private static final long serialVersionUID = -1L;
+
+ private static final String HUDI_OPTION_HIVE_SYNC_ENABLED =
"hive_sync.enabled";
+ private static final String HUDI_OPTION_HIVE_SYNC_DB = "hive_sync.db";
+ private static final String HUDI_OPTION_HIVE_SYNC_TABLE =
"hive_sync.table";
+ private static final String HUDI_OPTION_HIVE_SYNC_FILE_FORMAT =
"hive_sync.file_format";
+ private static final String HUDI_OPTION_HIVE_SYNC_MODE = "hive_sync.mode";
+ private static final String HUDI_OPTION_HIVE_SYNC_MODE_HMS_VALUE = "hms";
+ private static final String HUDI_OPTION_HIVE_SYNC_METASTORE_URIS =
"hive_sync.metastore.uris";
+ private static final String HUDI_OPTION_DEFAULT_PATH = "path";
+ private static final String HUDI_OPTION_DATABASE_NAME =
"hoodie.database.name";
+ private static final String HUDI_OPTION_TABLE_NAME = "hoodie.table.name";
+ private static final String HUDI_OPTION_RECORDKEY_FIELD_NAME =
"hoodie.datasource.write.recordkey.field";
+ private static final String HUDI_OPTION_PARTITIONPATH_FIELD_NAME =
"hoodie.datasource.write.partitionpath.field";
+ private static final String DDL_ATTRIBUTE_HUDI = "hudi.";
+
+ @JsonProperty("tableName")
+ @Nonnull
+ private String tableName;
+
+ @JsonProperty("dbName")
+ @Nonnull
+ private String dbName;
+
+ @JsonProperty("primaryKey")
+ private String primaryKey;
+
+ @JsonProperty("catalogType")
+ private CatalogType catalogType;
+
+ @JsonProperty("uri")
+ private String uri;
+
+ @JsonProperty("warehouse")
+ private String warehouse;
+
+ @JsonProperty("extList")
+ private List<HashMap<String, String>> extList;
+
+ @JsonProperty("partitionFields")
+ private List<FieldInfo> partitionFields;
+
+ @JsonCreator
+ public HudiLoadNode(
+ @JsonProperty("id") String id,
+ @JsonProperty("name") String name,
+ @JsonProperty("fields") List<FieldInfo> fields,
+ @JsonProperty("fieldRelations") List<FieldRelation> fieldRelations,
+ @JsonProperty("filters") List<FilterFunction> filters,
+ @JsonProperty("filterStrategy") FilterStrategy filterStrategy,
+ @Nullable @JsonProperty("sinkParallelism") Integer sinkParallelism,
+ @JsonProperty("properties") Map<String, String> properties,
+ @Nonnull @JsonProperty("dbName") String dbName,
+ @Nonnull @JsonProperty("tableName") String tableName,
+ @JsonProperty("primaryKey") String primaryKey,
+ @JsonProperty("catalogType") CatalogType catalogType,
+ @JsonProperty("uri") String uri,
+ @JsonProperty("warehouse") String warehouse,
+ @JsonProperty("extList") List<HashMap<String, String>> extList,
+ @JsonProperty("partitionFields") List<FieldInfo> partitionFields) {
+ super(id, name, fields, fieldRelations, filters, filterStrategy,
sinkParallelism, properties);
+ this.tableName = Preconditions.checkNotNull(tableName, "table name is
null");
+ this.dbName = Preconditions.checkNotNull(dbName, "db name is null");
+ this.primaryKey = primaryKey;
+ this.catalogType = catalogType == null ? CatalogType.HIVE :
catalogType;
+ this.uri = uri;
+ this.warehouse = warehouse;
+ this.extList = extList;
+ this.partitionFields = partitionFields;
+ }
+
+ @Override
+ public Map<String, String> tableOptions() {
+ Map<String, String> options = super.tableOptions();
+
+ options.put(HUDI_OPTION_HIVE_SYNC_ENABLED, "true");
+ options.put(HUDI_OPTION_HIVE_SYNC_MODE,
HUDI_OPTION_HIVE_SYNC_MODE_HMS_VALUE);
+ options.put(HUDI_OPTION_HIVE_SYNC_DB, dbName);
+ options.put(HUDI_OPTION_HIVE_SYNC_TABLE, tableName);
+ options.put(HUDI_OPTION_HIVE_SYNC_METASTORE_URIS, uri);
+
+ // partition field
+ if (partitionFields != null && !partitionFields.isEmpty()) {
+ String partitionKey =
+ partitionFields.stream()
+ .map(FieldInfo::getName)
+ .collect(Collectors.joining(","));
+ options.put(HUDI_OPTION_PARTITIONPATH_FIELD_NAME, partitionKey);
+ }
+
+ extList.forEach(ext -> {
+ String keyName = ext.get("keyName");
+ if (StringUtils.isNoneBlank(keyName) &&
+ keyName.startsWith(DDL_ATTRIBUTE_HUDI)) {
+ options.put(keyName, ext.get("keyValue"));
+ }
+ });
+
+ String path = String.format("%s/%s.db/%s", warehouse, dbName,
tableName);
+ options.put(HUDI_OPTION_DEFAULT_PATH, path);
+
+ options.put(HUDI_OPTION_DATABASE_NAME, dbName);
+ options.put(HUDI_OPTION_TABLE_NAME, tableName);
+ options.put(HUDI_OPTION_RECORDKEY_FIELD_NAME, primaryKey);
+ options.put("connector", "hudi-inlong");
+
+ return options;
+ }
+
+ @Override
+ public String genTableName() {
+ return tableName;
+ }
+
+ @Override
+ public String getPrimaryKey() {
+ return primaryKey;
+ }
+
+ @Override
+ public List<FieldInfo> getPartitionFields() {
+ return super.getPartitionFields();
+ }
+
+}
diff --git
a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
new file mode 100644
index 000000000..dabfd54ac
--- /dev/null
+++
b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/node/load/HudiLoadNodeTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.node.load;
+
+import com.google.common.collect.Maps;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.inlong.sort.SerializeBaseTest;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+
+/**
+ * Test for {@link HudiLoadNode}
+ */
+public class HudiLoadNodeTest extends SerializeBaseTest<HudiLoadNode> {
+
+ @Override
+ public HudiLoadNode getTestObject() {
+ Map<String, Object> properties = Maps.newHashMap();
+ return new HudiLoadNode("1", "test_hudi",
+ Collections.singletonList(new FieldInfo("id", new
StringFormatInfo())),
+ Collections.singletonList(new FieldRelation(new
FieldInfo("id", new StringFormatInfo()),
+ new FieldInfo("id", new StringFormatInfo()))),
+ null,
+ null,
+ 1,
+ null,
+ "test_db",
+ "test_table",
+ "id",
+ CatalogType.HIVE,
+ "thrift://localhost:9083",
+ "hdfs://localhost:9000/user/hudi/warehouse",
+ new ArrayList<>(),
+ new ArrayList<>());
+ }
+}
diff --git
a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
new file mode 100644
index 000000000..0cb9802d1
--- /dev/null
+++
b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/parser/HudiNodeSqlParserTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.parser;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.commons.compress.utils.Lists;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.inlong.sort.formats.common.FloatFormatInfo;
+import org.apache.inlong.sort.formats.common.IntFormatInfo;
+import org.apache.inlong.sort.formats.common.LongFormatInfo;
+import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.parser.impl.FlinkSqlParser;
+import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.GroupInfo;
+import org.apache.inlong.sort.protocol.StreamInfo;
+import org.apache.inlong.sort.protocol.constant.IcebergConstant.CatalogType;
+import org.apache.inlong.sort.protocol.node.Node;
+import org.apache.inlong.sort.protocol.node.extract.MySqlExtractNode;
+import org.apache.inlong.sort.protocol.node.load.HudiLoadNode;
+import org.apache.inlong.sort.protocol.transformation.FieldRelation;
+import org.apache.inlong.sort.protocol.transformation.relation.NodeRelation;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for Hudi SQL parser.
+ */
+public class HudiNodeSqlParserTest extends AbstractTestBase {
+
+ private MySqlExtractNode buildMySQLExtractNode(String id) {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("salary", new FloatFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()),
+ new FieldInfo("event_type", new StringFormatInfo()));
+ // if you hope hive load mode of append, please add this config
+ Map<String, String> map = new HashMap<>();
+ map.put("append-mode", "true");
+ return new MySqlExtractNode(id, "mysql_input", fields,
+ null, map, "id",
+ Collections.singletonList("work1"), "localhost", "root",
"123456",
+ "inlong", null, null,
+ null, null);
+ }
+
+ private HudiLoadNode buildHudiLoadNodeWithHadoopCatalog() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("salary", new StringFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new FieldInfo("name", new
StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelation(new FieldInfo("age", new
IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelation(new FieldInfo("ts", new
TimestampFormatInfo()),
+ new FieldInfo("ts", new
TimestampFormatInfo())));
+
+ List<HashMap<String, String>> extList = new ArrayList<>();
+ HashMap<String, String> map = new HashMap<>();
+ map.put("table.type", "MERGE_ON_READ");
+ extList.add(map);
+
+ return new HudiLoadNode(
+ "hudi",
+ "hudi_table_name",
+ fields,
+ relations,
+ null,
+ null,
+ null,
+ null,
+ "inlong",
+ "inlong_hudi",
+ null,
+ CatalogType.HADOOP,
+ null,
+ "hdfs://localhost:9000/hudi/warehouse",
+ extList,
+ Lists.newArrayList());
+ }
+
+ private HudiLoadNode buildHudiLoadNodeWithHiveCatalog() {
+ List<FieldInfo> fields = Arrays.asList(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo()),
+ new FieldInfo("ts", new TimestampFormatInfo()));
+ List<FieldRelation> relations = Arrays
+ .asList(new FieldRelation(new FieldInfo("id", new
LongFormatInfo()),
+ new FieldInfo("id", new LongFormatInfo())),
+ new FieldRelation(new FieldInfo("name", new
StringFormatInfo()),
+ new FieldInfo("name", new StringFormatInfo())),
+ new FieldRelation(new FieldInfo("age", new
IntFormatInfo()),
+ new FieldInfo("age", new IntFormatInfo())),
+ new FieldRelation(new FieldInfo("ts", new
TimestampFormatInfo()),
+ new FieldInfo("ts", new
TimestampFormatInfo())));
+ List<HashMap<String, String>> extList = new ArrayList<>();
+ HashMap<String, String> map = new HashMap<>();
+ map.put("table.type", "MERGE_ON_READ");
+ extList.add(map);
+
+ // set HIVE_CONF_DIR,or set uri and warehouse
+ return new HudiLoadNode(
+ "hudi",
+ "hudi_table_name",
+ fields,
+ relations,
+ null,
+ null,
+ null,
+ null,
+ "inlong",
+ "inlong_hudi",
+ null,
+ CatalogType.HIVE,
+ "thrift://localhost:9083",
+ "/hive/warehouse",
+ extList,
+ Lists.newArrayList());
+ }
+
+ /**
+ * build node relation
+ *
+ * @param inputs extract node
+ * @param outputs load node
+ * @return node relation
+ */
+ private NodeRelation buildNodeRelation(List<Node> inputs, List<Node>
outputs) {
+ List<String> inputIds =
inputs.stream().map(Node::getId).collect(Collectors.toList());
+ List<String> outputIds =
outputs.stream().map(Node::getId).collect(Collectors.toList());
+ return new NodeRelation(inputIds, outputIds);
+ }
+
+ @Test
+ public void testHudi() throws Exception {
+ EnvironmentSettings settings = EnvironmentSettings
+ .newInstance()
+ .useBlinkPlanner()
+ .inStreamingMode()
+ .build();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.enableCheckpointing(10000);
+ StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
settings);
+ Node inputNode = buildMySQLExtractNode("1");
+ Node outputNode = buildHudiLoadNodeWithHiveCatalog();
+ StreamInfo streamInfo = new StreamInfo("1L", Arrays.asList(inputNode,
outputNode),
+
Collections.singletonList(buildNodeRelation(Collections.singletonList(inputNode),
+ Collections.singletonList(outputNode))));
+ GroupInfo groupInfo = new GroupInfo("group_id",
Collections.singletonList(streamInfo));
+ FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv,
groupInfo);
+ FlinkSqlParseResult result = (FlinkSqlParseResult) parser.parse();
+ Assert.assertTrue(!result.getLoadSqls().isEmpty() &&
!result.getCreateTableSqls().isEmpty());
+ }
+}