This is an automated email from the ASF dual-hosted git repository.
zhanglistar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c3aacfcd25 add uts (#9534)
c3aacfcd25 is described below
commit c3aacfcd2593cda65e54a18b024b68cad6d0ade1
Author: lgbo <[email protected]>
AuthorDate: Wed May 7 17:28:05 2025 +0800
add uts (#9534)
---
gluten-flink/loader/pom.xml | 2 +-
gluten-flink/planner/pom.xml | 2 +-
gluten-flink/pom.xml | 3 +-
gluten-flink/runtime/pom.xml | 2 +-
gluten-flink/ut/pom.xml | 164 +++++++++++++++++++++
.../stream/common/GlutenStreamingTestBase.java | 89 +++++++++++
.../runtime/stream/common/Velox4jEnvironment.java | 39 +++++
.../table/runtime/stream/custom/ScanTest.java | 53 +++++++
8 files changed, 350 insertions(+), 4 deletions(-)
diff --git a/gluten-flink/loader/pom.xml b/gluten-flink/loader/pom.xml
index bc32eedeac..fe73ee4d5d 100644
--- a/gluten-flink/loader/pom.xml
+++ b/gluten-flink/loader/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-flink</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/gluten-flink/planner/pom.xml b/gluten-flink/planner/pom.xml
index b3fc899fe5..44c27be131 100644
--- a/gluten-flink/planner/pom.xml
+++ b/gluten-flink/planner/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-flink</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/gluten-flink/pom.xml b/gluten-flink/pom.xml
index 696d16ace5..b3d3aaada9 100644
--- a/gluten-flink/pom.xml
+++ b/gluten-flink/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-parent</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
@@ -32,6 +32,7 @@
<module>planner</module>
<module>loader</module>
<module>runtime</module>
+ <module>ut</module>
</modules>
<properties>
diff --git a/gluten-flink/runtime/pom.xml b/gluten-flink/runtime/pom.xml
index b02dafbefa..fd055f439e 100644
--- a/gluten-flink/runtime/pom.xml
+++ b/gluten-flink/runtime/pom.xml
@@ -20,7 +20,7 @@
<parent>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-flink</artifactId>
- <version>1.4.0-SNAPSHOT</version>
+ <version>1.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/gluten-flink/ut/pom.xml b/gluten-flink/ut/pom.xml
new file mode 100644
index 0000000000..6f4f87351a
--- /dev/null
+++ b/gluten-flink/ut/pom.xml
@@ -0,0 +1,164 @@
+
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-flink</artifactId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>gluten-flink-ut</artifactId>
+ <name>Gluten Flink UT</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <assertj.version>3.24.2</assertj.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-flink-loader</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-flink-runtime</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-loader</artifactId>
+ <version>1.19.2</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <classifier>tests</classifier>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.github.zhztheplayer</groupId>
+ <artifactId>velox4j</artifactId>
+ <version>${velox4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <version>5.9.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <version>5.9.2</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>${assertj.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>2.0.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.4.7</version>
+ </dependency>
+
+ </dependencies>
+
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>3.0.0-M5</version>
+ <configuration>
+ <skipTests>false</skipTests>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>4.8.1</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/gluten-flink/ut/src/test/java/org/apache/flink/table/runtime/stream/common/GlutenStreamingTestBase.java
b/gluten-flink/ut/src/test/java/org/apache/flink/table/runtime/stream/common/GlutenStreamingTestBase.java
new file mode 100644
index 0000000000..601953c196
--- /dev/null
+++
b/gluten-flink/ut/src/test/java/org/apache/flink/table/runtime/stream/common/GlutenStreamingTestBase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.common;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.StreamingTestBase;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class GlutenStreamingTestBase extends StreamingTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(GlutenStreamingTestBase.class);
+ private static final String EXECUTION_PLAN_PREIFX = "== Physical Execution
Plan ==";
+ @BeforeAll
+ public static void setup() throws Exception {
+ LOG.info("GlutenStreamingTestBase setup");
+ Velox4jEnvironment.initializeOnce();
+ }
+
+ /*
+ * schema is in format of "a int, b bigint, c string"
+ */
+ protected void createSimpleBoundedValuesTable(String tableName, String
schema, List<Row> rows) {
+ String myTableDataId = TestValuesTableFactory.registerData(rows);
+ String table =
+ "CREATE TABLE "
+ + tableName + "(\n"
+ + schema + "\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true',\n"
+ + String.format(" 'data-id' = '%s',\n", myTableDataId)
+ + " 'nested-projection-supported' = 'true'\n"
+ + ")";
+ tEnv().executeSql(table);
+ }
+
+ // Return the execution plan represented by StreamEexcNode
+ protected String explainExecutionPlan(String query) {
+ Table table = tEnv().sqlQuery(query);
+ String plainPlans = table.explain(ExplainDetail.JSON_EXECUTION_PLAN);
+ int index = plainPlans.indexOf(EXECUTION_PLAN_PREIFX);
+ if (index != -1) {
+ return plainPlans.substring(index +
EXECUTION_PLAN_PREIFX.length());
+ } else {
+ return "";
+ }
+ }
+
+ protected void runAndCheck(String query, List<String> expected) {
+ List<String> actual =
+
CollectionUtil.iteratorToList(tEnv().executeSql(query).collect()).stream()
+ .map(Object::toString)
+ .collect(Collectors.toList());
+ actual.sort(String::compareTo);
+ assertThat(actual).isEqualTo(expected);
+ }
+}
+
diff --git
a/gluten-flink/ut/src/test/java/org/apache/flink/table/runtime/stream/common/Velox4jEnvironment.java
b/gluten-flink/ut/src/test/java/org/apache/flink/table/runtime/stream/common/Velox4jEnvironment.java
new file mode 100644
index 0000000000..d59667a6a6
--- /dev/null
+++
b/gluten-flink/ut/src/test/java/org/apache/flink/table/runtime/stream/common/Velox4jEnvironment.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.stream.common;
+
+import io.github.zhztheplayer.velox4j.Velox4j;
+
+public class Velox4jEnvironment {
+ private static class Holder {
+ private static final Velox4jEnvironment INSTANCE = new
Velox4jEnvironment();
+ }
+
+ private Velox4jEnvironment() {
+ Velox4j.initialize();
+ }
+
+ public static Velox4jEnvironment getInstance() {
+ return Holder.INSTANCE;
+ }
+
+ public static boolean initializeOnce() {
+ Velox4jEnvironment instance = Holder.INSTANCE;
+ return instance != null;
+ }
+}
+
diff --git
a/gluten-flink/ut/src/test/java/org/apache/flink/table/runtime/stream/custom/ScanTest.java
b/gluten-flink/ut/src/test/java/org/apache/flink/table/runtime/stream/custom/ScanTest.java
new file mode 100644
index 0000000000..78260cf09c
--- /dev/null
+++
b/gluten-flink/ut/src/test/java/org/apache/flink/table/runtime/stream/custom/ScanTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.table.runtime.stream.custom;
+
+import org.apache.flink.table.runtime.stream.common.GlutenStreamingTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+class ScanTest extends GlutenStreamingTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(ScanTest.class);
+
+ @Override
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ List<Row> rows =
+ Arrays.asList(Row.of(1, 1L, "1"), Row.of(2, 2L, "2"),
Row.of(3, 3L, "3"));
+ createSimpleBoundedValuesTable("MyTable", "a int, b bigint, c string",
rows);
+ }
+
+ @Test
+ void testFilter() {
+ String query = "select a, b as b,c from MyTable where a > 0";
+ LOG.info("execution plan: {}", explainExecutionPlan(query));
+ runAndCheck(query, Arrays.asList("+I[1, 1, 1]", "+I[2, 2, 2]", "+I[3,
3, 3]"));
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]