This is an automated email from the ASF dual-hosted git repository.
loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 5823e6ad Add connector tests (#624)
5823e6ad is described below
commit 5823e6ada624e7ce0471809378b62ef45daa7fb5
Author: Loognqiang <[email protected]>
AuthorDate: Wed Sep 17 15:04:29 2025 +0800
Add connector tests (#624)
* [ISSUE-619] move the Hive Connector unit tests to an independent module
* [ISSUE-619] fix package name import order
* [ISSUE-619] fix check style
* [ISSUE-619] address comment
---
.github/workflows/ci-jdk11.yml | 8 +-
.../geaflow-dsl-connector-tests/pom.xml | 79 ++++++
.../runtime/connector/ConnectorCompilerTest.java | 96 ++++++++
.../dsl/runtime/connector/ConnectorTester.java | 270 +++++++++++++++++++++
.../dsl/runtime/connector}/HiveSourceTest.java | 4 +-
.../src/test/resources/query/hive_source_001.sql | 0
.../schema/function/BuildInSqlFunctionTable.java | 1 -
geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml | 5 -
.../apache/geaflow/dsl/runtime/CompilerTest.java | 64 -----
geaflow/geaflow-dsl/pom.xml | 33 +--
10 files changed, 449 insertions(+), 111 deletions(-)
diff --git a/.github/workflows/ci-jdk11.yml b/.github/workflows/ci-jdk11.yml
index 94c10311..ef18446f 100644
--- a/.github/workflows/ci-jdk11.yml
+++ b/.github/workflows/ci-jdk11.yml
@@ -58,11 +58,5 @@ jobs:
- name: Build and Test On JDK 11
run: |
test_modules="!geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive,"
- test_modules+="!geaflow/geaflow-dsl/geaflow-dsl-runtime,"
-
test_modules+="!geaflow/geaflow-analytics-service/geaflow-analytics-service-common,"
-
test_modules+="!geaflow/geaflow-analytics-service/geaflow-analytics-service-client,"
-
test_modules+="!geaflow/geaflow-analytics-service/geaflow-analytics-service-server,"
- test_modules+="!geaflow/geaflow-examples,"
- test_modules+="!geaflow/geaflow-deploy/geaflow-assembly,"
- test_modules+="!geaflow-mcp"
+ test_modules+="!geaflow/geaflow-dsl/geaflow-dsl-connector-tests"
mvn -B -e clean test -Pjdk11 -pl "${test_modules}"
-Duser.timezone=Asia/Shanghai -Dlog4j.configuration="log4j.rootLogger=WARN,
stdout"
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/pom.xml
b/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/pom.xml
new file mode 100644
index 00000000..e544de10
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/pom.xml
@@ -0,0 +1,79 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl</artifactId>
+ <version>0.6.8-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>geaflow-dsl-connector-tests</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-runtime</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geaflow</groupId>
+ <artifactId>geaflow-dsl-connector-hive</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-util</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-http</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-continuation</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/java/org/apache/geaflow/dsl/runtime/connector/ConnectorCompilerTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/java/org/apache/geaflow/dsl/runtime/connector/ConnectorCompilerTest.java
new file mode 100644
index 00000000..6ea51040
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/java/org/apache/geaflow/dsl/runtime/connector/ConnectorCompilerTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.connector;
+
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.geaflow.dsl.common.compile.CompileContext;
+import org.apache.geaflow.dsl.common.compile.QueryCompiler;
+import org.apache.geaflow.dsl.runtime.QueryClient;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class ConnectorCompilerTest {
+
+ @Test
+ public void testFindUnResolvedPlugins() {
+ QueryCompiler compiler = new QueryClient();
+ CompileContext context = new CompileContext();
+
+ String script = "CREATE GRAPH IF NOT EXISTS dy_modern (\n"
+ + " Vertex person (\n"
+ + " id bigint ID,\n"
+ + " name varchar\n"
+ + " ),\n"
+ + " Edge knows (\n"
+ + " srcId bigint SOURCE ID,\n"
+ + " targetId bigint DESTINATION ID,\n"
+ + " weight double\n"
+ + " )\n"
+ + ") WITH (\n"
+ + " storeType='rocksdb',\n"
+ + " shardCount = 1\n"
+ + ");\n"
+ + "\n"
+ + "\n"
+ + "CREATE TABLE hive (\n"
+ + " id BIGINT,\n"
+ + " name VARCHAR,\n"
+ + " age INT\n"
+ + ") WITH (\n"
+ + " type='hive',\n"
+ + " geaflow.dsl.kafka.servers = 'localhost:9092',\n"
+ + " geaflow.dsl.kafka.topic = 'read-topic'\n"
+ + ");\n"
+ + "\n"
+ + "CREATE TABLE kafka_sink (\n"
+ + " id BIGINT,\n"
+ + " name VARCHAR,\n"
+ + " age INT\n"
+ + ") WITH (\n"
+ + " type='kafka',\n"
+ + " geaflow.dsl.kafka.servers = 'localhost:9092',\n"
+ + " geaflow.dsl.kafka.topic = 'write-topic'\n"
+ + ");\n"
+ + "\n"
+ + "CREATE TABLE kafka_123(\n"
+ + " id BIGINT,\n"
+ + " name VARCHAR,\n"
+ + " age INT\n"
+ + ") WITH (\n"
+ + " type='kafka123',\n"
+ + " geaflow.dsl.kafka.servers = 'localhost:9092',\n"
+ + " geaflow.dsl.kafka.topic = 'write-topic'\n"
+ + ");\n"
+ + "\n"
+ + "INSERT INTO kafka_sink\n"
+ + "SELECT * FROM kafka_source;";
+
+ Set<String> plugins = compiler.getDeclaredTablePlugins(script,
context);
+ Set<String> enginePlugins = compiler.getEnginePlugins();
+ Assert.assertEquals(plugins.size(), 3);
+ List<String> filteredSet = plugins.stream().filter(e ->
!enginePlugins.contains(e.toUpperCase()))
+ .collect(Collectors.toList());
+ Assert.assertEquals(filteredSet.size(), 1);
+
+ Assert.assertEquals(filteredSet.get(0), "kafka123");
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/java/org/apache/geaflow/dsl/runtime/connector/ConnectorTester.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/java/org/apache/geaflow/dsl/runtime/connector/ConnectorTester.java
new file mode 100644
index 00000000..03beeb0d
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/java/org/apache/geaflow/dsl/runtime/connector/ConnectorTester.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.connector;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.geaflow.cluster.system.ClusterMetaStore;
+import org.apache.geaflow.common.config.Configuration;
+import org.apache.geaflow.common.config.keys.DSLConfigKeys;
+import org.apache.geaflow.common.config.keys.ExecutionConfigKeys;
+import org.apache.geaflow.dsl.common.exception.GeaFlowDSLException;
+import org.apache.geaflow.dsl.connector.file.FileConstants;
+import org.apache.geaflow.dsl.runtime.QueryClient;
+import org.apache.geaflow.dsl.runtime.QueryContext;
+import org.apache.geaflow.dsl.runtime.engine.GQLPipeLine;
+import org.apache.geaflow.dsl.runtime.engine.GQLPipeLine.GQLPipelineHook;
+import org.apache.geaflow.env.Environment;
+import org.apache.geaflow.env.EnvironmentFactory;
+import org.apache.geaflow.file.FileConfigKeys;
+import
org.apache.geaflow.runtime.core.scheduler.resource.ScheduledWorkerManagerFactory;
+import org.testng.Assert;
+
+public class ConnectorTester implements Serializable {
+
+ private int testTimeWaitSeconds = 0;
+
+ public static final String INIT_DDL = "/query/modern_graph.sql";
+ public static final String DSL_STATE_REMOTE_PATH = "/tmp/dsl/";
+
+ private String queryPath;
+
+ private boolean compareWithOrder = false;
+
+ private String graphDefinePath;
+
+ private boolean hasCustomWindowConfig = false;
+
+ protected boolean dedupe = false;
+
+ private int workerNum = (int)
ExecutionConfigKeys.CONTAINER_WORKER_NUM.getDefaultValue();
+
+ private final Map<String, String> config = new HashMap<>();
+
+ private ConnectorTester() {
+ try {
+ initRemotePath();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static ConnectorTester build() {
+ return new ConnectorTester();
+ }
+
+
+ public ConnectorTester withQueryPath(String queryPath) {
+ this.queryPath = queryPath;
+ return this;
+ }
+
+ public ConnectorTester withConfig(String key, Object value) {
+ this.config.put(key, String.valueOf(value));
+ return this;
+ }
+
+ public ConnectorTester execute() throws Exception {
+ if (queryPath == null) {
+ throw new IllegalArgumentException("You should call
withQueryPath() before execute().");
+ }
+ Map<String, String> config = new HashMap<>();
+ if (!hasCustomWindowConfig) {
+ config.put(DSLConfigKeys.GEAFLOW_DSL_WINDOW_SIZE.getKey(),
String.valueOf(-1L));
+ }
+ config.put(FileConfigKeys.ROOT.getKey(), DSL_STATE_REMOTE_PATH);
+ config.put(DSLConfigKeys.GEAFLOW_DSL_QUERY_PATH.getKey(),
FileConstants.PREFIX_JAVA_RESOURCE + queryPath);
+ config.put(ExecutionConfigKeys.CONTAINER_WORKER_NUM.getKey(),
String.valueOf(workerNum));
+ config.putAll(this.config);
+ initResultDirectory();
+
+ Environment environment = EnvironmentFactory.onLocalEnvironment();
+ environment.getEnvironmentContext().withConfig(config);
+
+ GQLPipeLine gqlPipeLine = new GQLPipeLine(environment,
testTimeWaitSeconds);
+
+ String graphDefinePath = null;
+ if (this.graphDefinePath != null) {
+ graphDefinePath = this.graphDefinePath;
+ }
+ gqlPipeLine.setPipelineHook(new TestGQLPipelineHook(graphDefinePath,
queryPath));
+ try {
+ gqlPipeLine.execute();
+ } finally {
+ environment.shutdown();
+ ClusterMetaStore.close();
+ ScheduledWorkerManagerFactory.clear();
+ }
+ return this;
+ }
+
+ private void initResultDirectory() throws Exception {
+ // delete target file path
+ String targetPath = getTargetPath(queryPath);
+ File targetFile = new File(targetPath);
+ if (targetFile.exists()) {
+ FileUtils.forceDelete(targetFile);
+ }
+ }
+
+ private void initRemotePath() throws IOException {
+ // delete state remote path
+ File stateRemoteFile = new File(DSL_STATE_REMOTE_PATH);
+ if (stateRemoteFile.exists()) {
+ FileUtils.forceDelete(stateRemoteFile);
+ }
+ }
+
+ public void checkSinkResult() throws Exception {
+ checkSinkResult(null);
+ }
+
+ public void checkSinkResult(String dict) throws Exception {
+ String[] paths = queryPath.split("/");
+ String lastPath = paths[paths.length - 1];
+ String exceptPath = dict != null ? "/expect/" + dict + "/" +
lastPath.split("\\.")[0] + ".txt"
+ : "/expect/" + lastPath.split("\\.")[0] + ".txt";
+ String targetPath = getTargetPath(queryPath);
+ String expectResult = IOUtils.resourceToString(exceptPath,
Charset.defaultCharset()).trim();
+ String actualResult = readFile(targetPath);
+ compareResult(actualResult, expectResult);
+ }
+
+ private void compareResult(String actualResult, String expectResult) {
+ if (compareWithOrder) {
+ Assert.assertEquals(expectResult, actualResult);
+ } else {
+ String[] actualLines = actualResult.split("\n");
+ String[] expectLines = expectResult.split("\n");
+ if (dedupe) {
+ List<String> actualLinesDedupe =
Arrays.asList(actualLines).stream().distinct().collect(Collectors.toList());
+ actualLines = actualLinesDedupe.toArray(new String[0]);
+ List<String> expectLinesDedupe =
Arrays.asList(expectLines).stream().distinct().collect(Collectors.toList());
+ expectLines = expectLinesDedupe.toArray(new String[0]);
+ }
+ Arrays.sort(actualLines);
+ Arrays.sort(expectLines);
+
+ String actualSort = StringUtils.join(actualLines, "\n");
+ String expectSort = StringUtils.join(expectLines, "\n");
+ if (!Objects.equals(actualSort, expectSort)) {
+ Assert.assertEquals(expectResult, actualResult);
+ }
+ }
+ }
+
+ private String readFile(String path) throws IOException {
+ File file = new File(path);
+ if (file.isHidden()) {
+ return "";
+ }
+ if (file.isFile()) {
+ return IOUtils.toString(new File(path).toURI(),
Charset.defaultCharset()).trim();
+ }
+ File[] files = file.listFiles();
+ StringBuilder content = new StringBuilder();
+ if (files != null) {
+ for (File subFile : files) {
+ String readText = readFile(subFile.getAbsolutePath());
+ if (StringUtils.isBlank(readText)) {
+ continue;
+ }
+ if (content.length() > 0) {
+ content.append("\n");
+ }
+ content.append(readText);
+ }
+ }
+ return content.toString().trim();
+ }
+
+ private static String getTargetPath(String queryPath) {
+ assert queryPath != null;
+ String[] paths = queryPath.split("/");
+ String lastPath = paths[paths.length - 1];
+ String targetPath = "target/" + lastPath.split("\\.")[0];
+ String currentPath = new File(".").getAbsolutePath();
+ targetPath = currentPath.substring(0, currentPath.length() - 1) +
targetPath;
+ return targetPath;
+ }
+
+ private static class TestGQLPipelineHook implements GQLPipelineHook {
+
+ private final String graphDefinePath;
+
+ private final String queryPath;
+
+ public TestGQLPipelineHook(String graphDefinePath, String queryPath) {
+ this.graphDefinePath = graphDefinePath;
+ this.queryPath = queryPath;
+ }
+
+ @Override
+ public String rewriteScript(String script, Configuration
configuration) {
+ String result = script;
+ String regex = "\\$\\{[^}]+}";
+ Pattern pattern = Pattern.compile(regex);
+ Matcher matcher = pattern.matcher(result);
+ while (matcher.find()) {
+ String matchedField = matcher.group();
+ String replaceKey = matchedField.substring(2,
matchedField.length() - 1);
+ if (replaceKey.equals("target")) {
+ result = result.replace(matchedField,
getTargetPath(queryPath));
+ } else {
+ String replaceData = configuration.getString(replaceKey);
+ Preconditions.checkState(replaceData != null, "Not found
replace key:{}", replaceKey);
+ result = result.replace(matchedField, replaceData);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void beforeExecute(QueryClient queryClient, QueryContext
queryContext) {
+ if (graphDefinePath != null) {
+ try {
+ String ddl = IOUtils.resourceToString(graphDefinePath,
Charset.defaultCharset());
+ queryClient.executeQuery(ddl, queryContext);
+ } catch (IOException e) {
+ throw new GeaFlowDSLException(e);
+ }
+ }
+ }
+
+ @Override
+ public void afterExecute(QueryClient queryClient, QueryContext
queryContext) {
+
+ }
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/HiveSourceTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/java/org/apache/geaflow/dsl/runtime/connector/HiveSourceTest.java
similarity index 94%
rename from
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/HiveSourceTest.java
rename to
geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/java/org/apache/geaflow/dsl/runtime/connector/HiveSourceTest.java
index af57d249..3b4ed093 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/HiveSourceTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/java/org/apache/geaflow/dsl/runtime/connector/HiveSourceTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.geaflow.dsl.runtime.query;
+package org.apache.geaflow.dsl.runtime.connector;
import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
import org.testng.annotations.Test;
@@ -26,7 +26,7 @@ public class HiveSourceTest {
@Test(enabled = false)
public void testHiveSource_001() throws Exception {
- QueryTester
+ ConnectorTester
.build()
.withConfig(FrameworkConfigKeys.BATCH_NUMBER_PER_CHECKPOINT.getKey(), 1)
.withQueryPath("/query/hive_source_001.sql")
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/hive_source_001.sql
b/geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/resources/query/hive_source_001.sql
similarity index 100%
rename from
geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/hive_source_001.sql
rename to
geaflow/geaflow-dsl/geaflow-dsl-connector-tests/src/test/resources/query/hive_source_001.sql
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
index 4962c5f9..9afb8514 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-plan/src/main/java/org/apache/geaflow/dsl/schema/function/BuildInSqlFunctionTable.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
import org.apache.calcite.sql.SqlIdentifier;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
index 7bde5bf8..de11b9c2 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
+++ b/geaflow/geaflow-dsl/geaflow-dsl-runtime/pom.xml
@@ -71,11 +71,6 @@
<artifactId>geaflow-dsl-connector-pulsar</artifactId>
</dependency>
- <dependency>
- <groupId>org.apache.geaflow</groupId>
- <artifactId>geaflow-dsl-connector-hive</artifactId>
- </dependency>
-
<dependency>
<groupId>org.apache.geaflow</groupId>
<artifactId>geaflow-dsl-connector-socket</artifactId>
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/CompilerTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/CompilerTest.java
index 6b64bcfd..b18d273e 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/CompilerTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/CompilerTest.java
@@ -107,70 +107,6 @@ public class CompilerTest {
Assert.assertEquals(functions.get(3), "default.f0");
}
- @Test
- public void testFindUnResolvedPlugins() {
- QueryCompiler compiler = new QueryClient();
- CompileContext context = new CompileContext();
-
- String script = "CREATE GRAPH IF NOT EXISTS dy_modern (\n"
- + " Vertex person (\n"
- + " id bigint ID,\n"
- + " name varchar\n"
- + " ),\n"
- + " Edge knows (\n"
- + " srcId bigint SOURCE ID,\n"
- + " targetId bigint DESTINATION ID,\n"
- + " weight double\n"
- + " )\n"
- + ") WITH (\n"
- + " storeType='rocksdb',\n"
- + " shardCount = 1\n"
- + ");\n"
- + "\n"
- + "\n"
- + "CREATE TABLE hive (\n"
- + " id BIGINT,\n"
- + " name VARCHAR,\n"
- + " age INT\n"
- + ") WITH (\n"
- + " type='hive',\n"
- + " geaflow.dsl.kafka.servers = 'localhost:9092',\n"
- + " geaflow.dsl.kafka.topic = 'read-topic'\n"
- + ");\n"
- + "\n"
- + "CREATE TABLE kafka_sink (\n"
- + " id BIGINT,\n"
- + " name VARCHAR,\n"
- + " age INT\n"
- + ") WITH (\n"
- + " type='kafka',\n"
- + " geaflow.dsl.kafka.servers = 'localhost:9092',\n"
- + " geaflow.dsl.kafka.topic = 'write-topic'\n"
- + ");\n"
- + "\n"
- + "CREATE TABLE kafka_123(\n"
- + " id BIGINT,\n"
- + " name VARCHAR,\n"
- + " age INT\n"
- + ") WITH (\n"
- + " type='kafka123',\n"
- + " geaflow.dsl.kafka.servers = 'localhost:9092',\n"
- + " geaflow.dsl.kafka.topic = 'write-topic'\n"
- + ");\n"
- + "\n"
- + "INSERT INTO kafka_sink\n"
- + "SELECT * FROM kafka_source;";
-
- Set<String> plugins = compiler.getDeclaredTablePlugins(script,
context);
- Set<String> enginePlugins = compiler.getEnginePlugins();
- Assert.assertEquals(plugins.size(), 3);
- List<String> filteredSet = plugins.stream().filter(e ->
!enginePlugins.contains(e.toUpperCase()))
- .collect(Collectors.toList());
- Assert.assertEquals(filteredSet.size(), 1);
-
- Assert.assertEquals(filteredSet.get(0), "kafka123");
- }
-
@Test
public void testFindTables() {
QueryCompiler compiler = new QueryClient();
diff --git a/geaflow/geaflow-dsl/pom.xml b/geaflow/geaflow-dsl/pom.xml
index 10c9a775..b8d58db2 100644
--- a/geaflow/geaflow-dsl/pom.xml
+++ b/geaflow/geaflow-dsl/pom.xml
@@ -40,6 +40,7 @@
<module>geaflow-dsl-runtime</module>
<module>geaflow-dsl-connector</module>
<module>geaflow-dsl-catalog</module>
+ <module>geaflow-dsl-connector-tests</module>
</modules>
<properties>
@@ -153,38 +154,6 @@
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.geaflow</groupId>
- <artifactId>geaflow-dsl-connector-hive</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-http</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-server</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-continuation</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-servlet</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]