This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 97c5504f716 branch-4.0: [improve](case) Fix spark flink case in jdk17
#56992 (#57080)
97c5504f716 is described below
commit 97c5504f7168199885b222f813612ef4d2dc9e24
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 17 14:59:25 2025 +0800
branch-4.0: [improve](case) Fix spark flink case in jdk17 #56992 (#57080)
Cherry-picked from #56992
Co-authored-by: wudi <[email protected]>
---
.../spark_connector/spark_connector.groovy | 41 --------
.../spark_connector/spark_connector_arrow.groovy | 15 ++-
.../spark_connector_read_type.groovy | 16 ++-
.../flink_connector_p0/flink_connector.groovy | 8 +-
.../flink_connector_syncdb.groovy | 14 ++-
.../flink_connector_p0/flink_connector_type.groovy | 108 +++++++--------------
run-regression-test.sh | 2 +-
7 files changed, 81 insertions(+), 123 deletions(-)
diff --git
a/regression-test/suites/connector_p0/spark_connector/spark_connector.groovy
b/regression-test/suites/connector_p0/spark_connector/spark_connector.groovy
deleted file mode 100644
index 06699d7c8ff..00000000000
--- a/regression-test/suites/connector_p0/spark_connector/spark_connector.groovy
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite("spark_connector", "connector") {
-
- def tableName = "spark_connector"
- sql """DROP TABLE IF EXISTS ${tableName}"""
- sql new File("""${context.file.parent}/ddl/create.sql""").text
- logger.info("start delete local spark doris demo jar...")
- def delete_local_spark_jar = "rm -rf spark-doris-demo.jar".execute()
- logger.info("start download spark doris demo ...")
- logger.info("getS3Url ==== ${getS3Url()}")
- def download_spark_jar = "/usr/bin/curl
${getS3Url()}/regression/spark-doris-connector-demo-jar-with-dependencies.jar
--output spark-doris-demo.jar".execute().getText()
- def out = "/usr/bin/ls -al spark-doris-demo.jar".execute().getText()
- logger.info("finish download spark doris demo, out: ${out}")
- def run_cmd = "java -jar spark-doris-demo.jar
$context.config.feHttpAddress $context.config.feHttpUser
regression_test_connector_p0_spark_connector.$tableName"
- logger.info("run_cmd : $run_cmd")
- def proc = run_cmd.execute()
- def sout = new StringBuilder()
- def serr = new StringBuilder()
- proc.consumeProcessOutput(sout, serr)
- proc.waitForOrKill(1200_000)
- if (proc.exitValue() != 0) {
- logger.warn("failed to execute jar: code=${proc.exitValue()}, " +
"output: ${sout.toString()}, error: ${serr.toString()}")
- }
- qt_select """ select * from $tableName order by order_id"""
-}
diff --git
a/regression-test/suites/connector_p0/spark_connector/spark_connector_arrow.groovy
b/regression-test/suites/connector_p0/spark_connector/spark_connector_arrow.groovy
index a5fbc3b2835..7d9d07e7af3 100644
---
a/regression-test/suites/connector_p0/spark_connector/spark_connector_arrow.groovy
+++
b/regression-test/suites/connector_p0/spark_connector/spark_connector_arrow.groovy
@@ -126,7 +126,7 @@ suite("spark_connector_for_arrow", "connector") {
sql """DELETE FROM spark_connector_map where id > 0"""
sql """DELETE FROM spark_connector_struct where id > 0"""
- def jar_name =
"spark-doris-connector-3.1_2.12-1.3.0-SNAPSHOT-with-dependencies.jar"
+ def jar_name = "spark-doris-connector-3.4_2.12-1.3.0-SNAPSHOT.jar"
logger.info("start delete local spark doris demo jar...")
def delete_local_spark_jar = "rm -rf ${jar_name}".execute()
@@ -134,7 +134,18 @@ suite("spark_connector_for_arrow", "connector") {
logger.info("getS3Url ==== ${getS3Url()}")
def download_spark_jar = "/usr/bin/curl
${getS3Url()}/regression/${jar_name} --output ${jar_name}".execute().getText()
logger.info("finish download spark doris demo ...")
- def run_cmd = "java -cp ${jar_name}
org.apache.doris.spark.testcase.TestStreamLoadForArrowType
$context.config.feHttpAddress $context.config.feHttpUser
regression_test_connector_p0_spark_connector"
+
+ def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+ logger.info("System java path: ${javaPath}")
+
+ def javaVersion = System.getProperty("java.version")
+ logger.info("System java version: ${javaVersion}")
+ def addOpens = ""
+ if (javaVersion.startsWith("17")) {
+ addOpens = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED"
+ }
+
+ def run_cmd = "${javaPath} ${addOpens} -cp ${jar_name}
org.apache.doris.spark.testcase.TestStreamLoadForArrowType
$context.config.feHttpAddress $context.config.feHttpUser
regression_test_connector_p0_spark_connector"
logger.info("run_cmd : $run_cmd")
def proc = run_cmd.execute()
def sout = new StringBuilder()
diff --git
a/regression-test/suites/connector_p0/spark_connector/spark_connector_read_type.groovy
b/regression-test/suites/connector_p0/spark_connector/spark_connector_read_type.groovy
index 632e5e3d401..03345335105 100644
---
a/regression-test/suites/connector_p0/spark_connector/spark_connector_read_type.groovy
+++
b/regression-test/suites/connector_p0/spark_connector/spark_connector_read_type.groovy
@@ -97,7 +97,18 @@ suite("spark_connector_read_type", "connector") {
logger.info("getS3Url ==== ${getS3Url()}")
def download_spark_jar = "/usr/bin/curl
${getS3Url()}/regression/spark-doris-read-jar-with-dependencies.jar --output
spark-doris-read.jar".execute().getText()
logger.info("finish download spark doris demo ...")
- def run_cmd = "java -jar spark-doris-read.jar
$context.config.feHttpAddress $context.config.feHttpUser
regression_test_connector_p0_spark_connector.$tableReadName
regression_test_connector_p0_spark_connector.$tableWriterName"
+
+ def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+ logger.info("System java path: ${javaPath}")
+
+ def javaVersion = System.getProperty("java.version")
+ logger.info("System java version: ${javaVersion}")
+ def addOpens = ""
+ if (javaVersion.startsWith("17")) {
+ addOpens = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
+ }
+
+ def run_cmd = "${javaPath} ${addOpens} -jar spark-doris-read.jar
$context.config.feHttpAddress $context.config.feHttpUser
regression_test_connector_p0_spark_connector.$tableReadName
regression_test_connector_p0_spark_connector.$tableWriterName"
logger.info("run_cmd : $run_cmd")
def proc = run_cmd.execute()
def sout = new StringBuilder()
@@ -109,7 +120,4 @@ suite("spark_connector_read_type", "connector") {
}
qt_select """ select * from $tableWriterName order by id"""
-
-
-
}
diff --git a/regression-test/suites/flink_connector_p0/flink_connector.groovy
b/regression-test/suites/flink_connector_p0/flink_connector.groovy
index 5981895d8ae..93b0552919b 100644
--- a/regression-test/suites/flink_connector_p0/flink_connector.groovy
+++ b/regression-test/suites/flink_connector_p0/flink_connector.groovy
@@ -42,7 +42,13 @@ suite("flink_connector") {
return
}
- def run_cmd = "java -cp flink-doris-demo.jar
com.doris.DorisFlinkDfSinkDemo $context.config.feHttpAddress
regression_test_flink_connector_p0.$tableName $context.config.feHttpUser"
+ def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+ logger.info("System java path: ${javaPath}")
+
+ def javaVersion = System.getProperty("java.version")
+ logger.info("System java version: ${javaVersion}")
+
+ def run_cmd = "${javaPath} -cp flink-doris-demo.jar
com.doris.DorisFlinkDfSinkDemo $context.config.feHttpAddress
regression_test_flink_connector_p0.$tableName $context.config.feHttpUser"
logger.info("run_cmd : $run_cmd")
def run_flink_jar = run_cmd.execute().getText()
logger.info("result: $run_flink_jar")
diff --git
a/regression-test/suites/flink_connector_p0/flink_connector_syncdb.groovy
b/regression-test/suites/flink_connector_p0/flink_connector_syncdb.groovy
index 0496eab75ae..ad72bbfaa4e 100644
--- a/regression-test/suites/flink_connector_p0/flink_connector_syncdb.groovy
+++ b/regression-test/suites/flink_connector_p0/flink_connector_syncdb.groovy
@@ -74,7 +74,17 @@ PROPERTIES (
throw new Exception("File flink-doris-syncdb.jar download failed.")
}
- def run_cmd = "java -cp flink-doris-syncdb.jar
org.apache.doris.DatabaseFullSync $context.config.feHttpAddress
regression_test_flink_connector_p0 $context.config.feHttpUser"
+ def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+ logger.info("System java path: ${javaPath}")
+ def javaVersion = System.getProperty("java.version")
+ logger.info("System java version: ${javaVersion}")
+
+ def addOpens = ""
+ if (javaVersion.startsWith("17")) {
+ addOpens = "--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens
java.base/java.lang=ALL-UNNAMED"
+ }
+
+ def run_cmd = "${javaPath} ${addOpens} -cp flink-doris-syncdb.jar
org.apache.doris.DatabaseFullSync $context.config.feHttpAddress
regression_test_flink_connector_p0 $context.config.feHttpUser"
logger.info("run_cmd : $run_cmd")
def run_flink_jar = run_cmd.execute().getText()
logger.info("result: $run_flink_jar")
@@ -85,7 +95,7 @@ PROPERTIES (
logger.info("retry $tableName1 count: $resultTbl1")
def resultTbl2 = sql """ select count(1) from $tableName2"""
logger.info("retry $tableName2 count: $resultTbl2")
- resultTbl1.size() >= 1 && resultTbl2.size >=1
+ resultTbl1.size() >= 1 && resultTbl2.size() >=1
})
qt_select """ select * from $tableName1 order by id"""
diff --git
a/regression-test/suites/flink_connector_p0/flink_connector_type.groovy
b/regression-test/suites/flink_connector_p0/flink_connector_type.groovy
index ab0c308a137..d5a9d97d2bd 100644
--- a/regression-test/suites/flink_connector_p0/flink_connector_type.groovy
+++ b/regression-test/suites/flink_connector_p0/flink_connector_type.groovy
@@ -19,11 +19,8 @@
// /testing/trino-product-tests/src/main/resources/sql-tests/testcases/tpcds
// and modified by Doris.
-
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableResult
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
+import static java.util.concurrent.TimeUnit.SECONDS
+import org.awaitility.Awaitility
suite("flink_connector_type") {
@@ -112,77 +109,44 @@ VALUES
def thisDb = sql """select database()""";
thisDb = thisDb[0][0];
logger.info("current database is ${thisDb}");
- final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
- tEnv.executeSql(
- "CREATE TABLE source_doris (" +
- "`id` int,\n" +
- "`c1` boolean,\n" +
- "`c2` tinyint,\n" +
- "`c3` smallint,\n" +
- "`c4` int,\n" +
- "`c5` bigint,\n" +
- "`c6` string,\n" +
- "`c7` float,\n" +
- "`c8` double,\n" +
- "`c9` decimal(12,4),\n" +
- "`c10` date,\n" +
- "`c11` TIMESTAMP,\n" +
- "`c12` char(1),\n" +
- "`c13` varchar(256),\n" +
- "`c14` Array<String>,\n" +
- "`c15` Map<String, String>,\n" +
- "`c16` ROW<name String, age int>,\n" +
- "`c17` STRING,\n" +
- "`c18` STRING"
- + ") "
- + "WITH (\n"
- + " 'connector' = 'doris',\n"
- + " 'fenodes' = '" + context.config.feHttpAddress + "',\n"
- + " 'table.identifier' = '${thisDb}.test_types_source',\n"
- + " 'username' = '" + context.config.feHttpUser + "',\n"
- + " 'password' = '" + context.config.feHttpPassword +
"'\n"
- + ")");
+ logger.info("start delete local flink-doris-case.jar....")
+ def delete_local_flink_jar = "rm -rf flink-doris-case.jar".execute()
+ logger.info("start download regression/flink-doris-case.jar ....")
+ logger.info("getS3Url: ${getS3Url()}")
+ def download_flink_jar = "wget --quiet --continue --tries=5
${getS3Url()}/regression/flink-doris-case.jar".execute().getText()
+
+ def file = new File('flink-doris-case.jar')
+ if (file.exists()) {
+ def fileSize = file.length()
+ logger.info("finish download flink-doris-case.jar, size " + fileSize)
+ } else {
+ logger.info("flink-doris-case.jar download failed")
+ throw new Exception("File flink-doris-case.jar download failed.")
+ }
+
+ def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+ logger.info("System java path: ${javaPath}")
+ def javaVersion = System.getProperty("java.version")
+ logger.info("System java version: ${javaVersion}")
+ def addOpens = ""
+ if (javaVersion.startsWith("17")) {
+ addOpens = "--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens
java.base/java.lang=ALL-UNNAMED"
+ }
- tEnv.executeSql(
- "CREATE TABLE doris_test_sink (" +
- "`id` int,\n" +
- "`c1` boolean,\n" +
- "`c2` tinyint,\n" +
- "`c3` smallint,\n" +
- "`c4` int,\n" +
- "`c5` bigint,\n" +
- "`c6` string,\n" +
- "`c7` float,\n" +
- "`c8` double,\n" +
- "`c9` decimal(12,4),\n" +
- "`c10` date,\n" +
- "`c11` TIMESTAMP,\n" +
- "`c12` char(1),\n" +
- "`c13` varchar(256),\n" +
- "`c14` Array<String>,\n" +
- "`c15` Map<String, String>,\n" +
- "`c16` ROW<name String, age int>,\n" +
- "`c17` STRING,\n" +
- "`c18` STRING"
- + ") "
- + "WITH (\n"
- + " 'connector' = 'doris',\n"
- + " 'fenodes' = '" + context.config.feHttpAddress + "',\n"
- + " 'table.identifier' = '${thisDb}.test_types_sink',\n"
- + " 'username' = '" + context.config.feHttpUser + "',\n"
- + " 'password' = '" + context.config.feHttpPassword +
"',\n"
- + " 'sink.properties.format' = 'json',\n"
- + " 'sink.properties.read_json_by_line' = 'true',\n"
- + " 'sink.label-prefix' = 'label" + UUID.randomUUID() +
"'"
- + ")");
+ def run_cmd = "${javaPath} ${addOpens} -cp flink-doris-case.jar
org.apache.doris.FlinkConnectorTypeCase $context.config.feHttpAddress
regression_test_flink_connector_p0 $context.config.feHttpUser"
+ logger.info("run_cmd : $run_cmd")
+ def run_flink_jar = run_cmd.execute().getText()
+ logger.info("result: $run_flink_jar")
+ // The publish in the commit phase is asynchronous
+ Awaitility.await().atMost(30, SECONDS).pollInterval(1,
SECONDS).await().until(
+ {
+ def resultTbl = sql """ select count(1) from test_types_sink"""
+ logger.info("retry test_types_sink count: $resultTbl")
+ resultTbl.size() >= 1
+ })
- TableResult tableResult = tEnv.executeSql("INSERT INTO doris_test_sink
select * from source_doris");
- tableResult.await();
logger.info("flink job execute finished.");
qt_select """ select * from test_types_sink order by id"""
}
diff --git a/run-regression-test.sh b/run-regression-test.sh
index 16256152887..a026822ca18 100755
--- a/run-regression-test.sh
+++ b/run-regression-test.sh
@@ -229,4 +229,4 @@ if [[ "${ONLY_COMPILE}" -eq 0 ]]; then
-jar ${RUN_JAR:+${RUN_JAR}} \
-cf "${CONFIG_FILE}" \
${REGRESSION_OPTIONS_PREFIX:+${REGRESSION_OPTIONS_PREFIX}} "$@"
-fi
+fi
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]