This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 97c5504f716 branch-4.0: [improve](case) Fix spark flink case in jdk17 
#56992 (#57080)
97c5504f716 is described below

commit 97c5504f7168199885b222f813612ef4d2dc9e24
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Oct 17 14:59:25 2025 +0800

    branch-4.0: [improve](case) Fix spark flink case in jdk17 #56992 (#57080)
    
    Cherry-picked from #56992
    
    Co-authored-by: wudi <[email protected]>
---
 .../spark_connector/spark_connector.groovy         |  41 --------
 .../spark_connector/spark_connector_arrow.groovy   |  15 ++-
 .../spark_connector_read_type.groovy               |  16 ++-
 .../flink_connector_p0/flink_connector.groovy      |   8 +-
 .../flink_connector_syncdb.groovy                  |  14 ++-
 .../flink_connector_p0/flink_connector_type.groovy | 108 +++++++--------------
 run-regression-test.sh                             |   2 +-
 7 files changed, 81 insertions(+), 123 deletions(-)

diff --git 
a/regression-test/suites/connector_p0/spark_connector/spark_connector.groovy 
b/regression-test/suites/connector_p0/spark_connector/spark_connector.groovy
deleted file mode 100644
index 06699d7c8ff..00000000000
--- a/regression-test/suites/connector_p0/spark_connector/spark_connector.groovy
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite("spark_connector", "connector") {
-
-    def tableName = "spark_connector"
-    sql """DROP TABLE IF EXISTS ${tableName}"""
-    sql new File("""${context.file.parent}/ddl/create.sql""").text
-    logger.info("start delete local spark doris demo jar...")
-    def delete_local_spark_jar = "rm -rf spark-doris-demo.jar".execute()
-    logger.info("start download spark doris demo ...")
-    logger.info("getS3Url ==== ${getS3Url()}")
-    def download_spark_jar = "/usr/bin/curl 
${getS3Url()}/regression/spark-doris-connector-demo-jar-with-dependencies.jar 
--output spark-doris-demo.jar".execute().getText()
-    def out = "/usr/bin/ls -al spark-doris-demo.jar".execute().getText()
-    logger.info("finish download spark doris demo, out: ${out}")
-    def run_cmd = "java -jar spark-doris-demo.jar 
$context.config.feHttpAddress $context.config.feHttpUser 
regression_test_connector_p0_spark_connector.$tableName"
-    logger.info("run_cmd : $run_cmd")
-    def proc = run_cmd.execute()
-    def sout = new StringBuilder()
-    def serr = new StringBuilder()
-    proc.consumeProcessOutput(sout, serr)
-    proc.waitForOrKill(1200_000)
-    if (proc.exitValue() != 0) {
-      logger.warn("failed to execute jar: code=${proc.exitValue()}, " + 
"output: ${sout.toString()}, error: ${serr.toString()}")
-    }
-    qt_select """ select * from $tableName order by order_id"""
-}
diff --git 
a/regression-test/suites/connector_p0/spark_connector/spark_connector_arrow.groovy
 
b/regression-test/suites/connector_p0/spark_connector/spark_connector_arrow.groovy
index a5fbc3b2835..7d9d07e7af3 100644
--- 
a/regression-test/suites/connector_p0/spark_connector/spark_connector_arrow.groovy
+++ 
b/regression-test/suites/connector_p0/spark_connector/spark_connector_arrow.groovy
@@ -126,7 +126,7 @@ suite("spark_connector_for_arrow", "connector") {
     sql """DELETE FROM spark_connector_map where id > 0"""
     sql """DELETE FROM spark_connector_struct where id > 0"""
 
-    def jar_name = 
"spark-doris-connector-3.1_2.12-1.3.0-SNAPSHOT-with-dependencies.jar"
+    def jar_name = "spark-doris-connector-3.4_2.12-1.3.0-SNAPSHOT.jar"
 
     logger.info("start delete local spark doris demo jar...")
     def delete_local_spark_jar = "rm -rf ${jar_name}".execute()
@@ -134,7 +134,18 @@ suite("spark_connector_for_arrow", "connector") {
     logger.info("getS3Url ==== ${getS3Url()}")
     def download_spark_jar = "/usr/bin/curl 
${getS3Url()}/regression/${jar_name} --output ${jar_name}".execute().getText()
     logger.info("finish download spark doris demo ...")
-    def run_cmd = "java -cp ${jar_name} 
org.apache.doris.spark.testcase.TestStreamLoadForArrowType 
$context.config.feHttpAddress $context.config.feHttpUser 
regression_test_connector_p0_spark_connector"
+
+    def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+    logger.info("System java path: ${javaPath}")
+
+    def javaVersion = System.getProperty("java.version")
+    logger.info("System java version: ${javaVersion}")
+    def addOpens = ""
+    if (javaVersion.startsWith("17")) {
+        addOpens = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED  
--add-opens=java.base/java.nio=ALL-UNNAMED"
+    }
+
+    def run_cmd = "${javaPath} ${addOpens} -cp ${jar_name} 
org.apache.doris.spark.testcase.TestStreamLoadForArrowType 
$context.config.feHttpAddress $context.config.feHttpUser 
regression_test_connector_p0_spark_connector"
     logger.info("run_cmd : $run_cmd")
     def proc = run_cmd.execute()
     def sout = new StringBuilder()
diff --git 
a/regression-test/suites/connector_p0/spark_connector/spark_connector_read_type.groovy
 
b/regression-test/suites/connector_p0/spark_connector/spark_connector_read_type.groovy
index 632e5e3d401..03345335105 100644
--- 
a/regression-test/suites/connector_p0/spark_connector/spark_connector_read_type.groovy
+++ 
b/regression-test/suites/connector_p0/spark_connector/spark_connector_read_type.groovy
@@ -97,7 +97,18 @@ suite("spark_connector_read_type", "connector") {
     logger.info("getS3Url ==== ${getS3Url()}")
     def download_spark_jar = "/usr/bin/curl 
${getS3Url()}/regression/spark-doris-read-jar-with-dependencies.jar --output 
spark-doris-read.jar".execute().getText()
     logger.info("finish download spark doris demo ...")
-    def run_cmd = "java -jar spark-doris-read.jar 
$context.config.feHttpAddress $context.config.feHttpUser 
regression_test_connector_p0_spark_connector.$tableReadName 
regression_test_connector_p0_spark_connector.$tableWriterName"
+
+    def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+    logger.info("System java path: ${javaPath}")
+
+    def javaVersion = System.getProperty("java.version")
+    logger.info("System java version: ${javaVersion}")
+    def addOpens = ""
+    if (javaVersion.startsWith("17")) {
+        addOpens = "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED  
--add-opens=java.base/java.nio=ALL-UNNAMED 
--add-opens=java.base/sun.util.calendar=ALL-UNNAMED"
+    }
+
+    def run_cmd = "${javaPath} ${addOpens} -jar spark-doris-read.jar 
$context.config.feHttpAddress $context.config.feHttpUser 
regression_test_connector_p0_spark_connector.$tableReadName 
regression_test_connector_p0_spark_connector.$tableWriterName"
     logger.info("run_cmd : $run_cmd")
     def proc = run_cmd.execute()
     def sout = new StringBuilder()
@@ -109,7 +120,4 @@ suite("spark_connector_read_type", "connector") {
     }
 
     qt_select """ select * from $tableWriterName order by id"""
-
-
-
 }
diff --git a/regression-test/suites/flink_connector_p0/flink_connector.groovy 
b/regression-test/suites/flink_connector_p0/flink_connector.groovy
index 5981895d8ae..93b0552919b 100644
--- a/regression-test/suites/flink_connector_p0/flink_connector.groovy
+++ b/regression-test/suites/flink_connector_p0/flink_connector.groovy
@@ -42,7 +42,13 @@ suite("flink_connector") {
         return
     }
 
-    def run_cmd = "java -cp flink-doris-demo.jar 
com.doris.DorisFlinkDfSinkDemo $context.config.feHttpAddress 
regression_test_flink_connector_p0.$tableName $context.config.feHttpUser"
+    def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+    logger.info("System java path: ${javaPath}")
+
+    def javaVersion = System.getProperty("java.version")
+    logger.info("System java version: ${javaVersion}")
+
+    def run_cmd = "${javaPath} -cp flink-doris-demo.jar 
com.doris.DorisFlinkDfSinkDemo $context.config.feHttpAddress 
regression_test_flink_connector_p0.$tableName $context.config.feHttpUser"
     logger.info("run_cmd : $run_cmd")
     def run_flink_jar = run_cmd.execute().getText()
     logger.info("result: $run_flink_jar")
diff --git 
a/regression-test/suites/flink_connector_p0/flink_connector_syncdb.groovy 
b/regression-test/suites/flink_connector_p0/flink_connector_syncdb.groovy
index 0496eab75ae..ad72bbfaa4e 100644
--- a/regression-test/suites/flink_connector_p0/flink_connector_syncdb.groovy
+++ b/regression-test/suites/flink_connector_p0/flink_connector_syncdb.groovy
@@ -74,7 +74,17 @@ PROPERTIES (
         throw new Exception("File flink-doris-syncdb.jar download failed.")
     }
 
-    def run_cmd = "java -cp flink-doris-syncdb.jar 
org.apache.doris.DatabaseFullSync $context.config.feHttpAddress 
regression_test_flink_connector_p0 $context.config.feHttpUser"
+    def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+    logger.info("System java path: ${javaPath}")
+    def javaVersion = System.getProperty("java.version")
+    logger.info("System java version: ${javaVersion}")
+
+    def addOpens = ""
+    if (javaVersion.startsWith("17")) {
+        addOpens = "--add-opens=java.base/java.nio=ALL-UNNAMED --add-opens 
java.base/java.lang=ALL-UNNAMED"
+    }
+
+    def run_cmd = "${javaPath} ${addOpens} -cp flink-doris-syncdb.jar 
org.apache.doris.DatabaseFullSync $context.config.feHttpAddress 
regression_test_flink_connector_p0 $context.config.feHttpUser"
     logger.info("run_cmd : $run_cmd")
     def run_flink_jar = run_cmd.execute().getText()
     logger.info("result: $run_flink_jar")
@@ -85,7 +95,7 @@ PROPERTIES (
                 logger.info("retry $tableName1  count: $resultTbl1")
                 def resultTbl2 = sql """ select count(1) from $tableName2"""
                 logger.info("retry $tableName2 count: $resultTbl2")
-                resultTbl1.size() >= 1 && resultTbl2.size >=1
+                resultTbl1.size() >= 1 && resultTbl2.size() >=1
             })
 
     qt_select """ select * from $tableName1 order by id"""
diff --git 
a/regression-test/suites/flink_connector_p0/flink_connector_type.groovy 
b/regression-test/suites/flink_connector_p0/flink_connector_type.groovy
index ab0c308a137..d5a9d97d2bd 100644
--- a/regression-test/suites/flink_connector_p0/flink_connector_type.groovy
+++ b/regression-test/suites/flink_connector_p0/flink_connector_type.groovy
@@ -19,11 +19,8 @@
 // /testing/trino-product-tests/src/main/resources/sql-tests/testcases/tpcds
 // and modified by Doris.
 
-
-import org.apache.flink.api.common.RuntimeExecutionMode
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableResult
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
+import static java.util.concurrent.TimeUnit.SECONDS
+import org.awaitility.Awaitility
 
 suite("flink_connector_type") {
 
@@ -112,77 +109,44 @@ VALUES
     def thisDb = sql """select database()""";
     thisDb = thisDb[0][0];
     logger.info("current database is ${thisDb}");
-    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-    env.setParallelism(1);
-    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
-    final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
-    tEnv.executeSql(
-            "CREATE TABLE source_doris (" +
-                    "`id` int,\n" +
-                    "`c1` boolean,\n" +
-                    "`c2` tinyint,\n" +
-                    "`c3` smallint,\n" +
-                    "`c4` int,\n" +
-                    "`c5` bigint,\n" +
-                    "`c6` string,\n" +
-                    "`c7` float,\n" +
-                    "`c8` double,\n" +
-                    "`c9` decimal(12,4),\n" +
-                    "`c10` date,\n" +
-                    "`c11` TIMESTAMP,\n" +
-                    "`c12` char(1),\n" +
-                    "`c13` varchar(256),\n" +
-                    "`c14` Array<String>,\n" +
-                    "`c15` Map<String, String>,\n" +
-                    "`c16` ROW<name String, age int>,\n" +
-                    "`c17` STRING,\n" +
-                    "`c18` STRING"
-                    + ") "
-                    + "WITH (\n"
-                    + "  'connector' = 'doris',\n"
-                    + "  'fenodes' = '" + context.config.feHttpAddress + "',\n"
-                    + "  'table.identifier' = '${thisDb}.test_types_source',\n"
-                    + "  'username' = '" + context.config.feHttpUser + "',\n"
-                    + "  'password' = '" + context.config.feHttpPassword +  
"'\n"
-                    + ")");
+    logger.info("start delete local flink-doris-case.jar....")
+    def delete_local_flink_jar = "rm -rf flink-doris-case.jar".execute()
+    logger.info("start download regression/flink-doris-case.jar ....")
+    logger.info("getS3Url: ${getS3Url()}")
+    def download_flink_jar = "wget --quiet --continue --tries=5 
${getS3Url()}/regression/flink-doris-case.jar".execute().getText()
+
+    def file = new File('flink-doris-case.jar')
+    if (file.exists()) {
+        def fileSize = file.length()
+        logger.info("finish download flink-doris-case.jar, size " + fileSize)
+    } else {
+        logger.info("flink-doris-case.jar download failed")
+        throw new Exception("File flink-doris-case.jar download failed.")
+    }
+
+    def javaPath = ["bash", "-c", "which java"].execute().text.trim()
+    logger.info("System java path: ${javaPath}")
+    def javaVersion = System.getProperty("java.version")
+    logger.info("System java version: ${javaVersion}")
 
+    def addOpens = ""
+    if (javaVersion.startsWith("17")) {
+        addOpens = "--add-opens=java.base/java.nio=ALL-UNNAMED  --add-opens 
java.base/java.lang=ALL-UNNAMED"
+    }
 
-    tEnv.executeSql(
-            "CREATE TABLE doris_test_sink (" +
-                    "`id` int,\n" +
-                    "`c1` boolean,\n" +
-                    "`c2` tinyint,\n" +
-                    "`c3` smallint,\n" +
-                    "`c4` int,\n" +
-                    "`c5` bigint,\n" +
-                    "`c6` string,\n" +
-                    "`c7` float,\n" +
-                    "`c8` double,\n" +
-                    "`c9` decimal(12,4),\n" +
-                    "`c10` date,\n" +
-                    "`c11` TIMESTAMP,\n" +
-                    "`c12` char(1),\n" +
-                    "`c13` varchar(256),\n" +
-                    "`c14` Array<String>,\n" +
-                    "`c15` Map<String, String>,\n" +
-                    "`c16` ROW<name String, age int>,\n" +
-                    "`c17` STRING,\n" +
-                    "`c18` STRING"
-                    + ") "
-                    + "WITH (\n"
-                    + "  'connector' = 'doris',\n"
-                    + "  'fenodes' = '" + context.config.feHttpAddress + "',\n"
-                    + "  'table.identifier' = '${thisDb}.test_types_sink',\n"
-                    + "  'username' = '" + context.config.feHttpUser + "',\n"
-                    + "  'password' = '" + context.config.feHttpPassword +  
"',\n"
-                    + "  'sink.properties.format' = 'json',\n"
-                    + "  'sink.properties.read_json_by_line' = 'true',\n"
-                    + "  'sink.label-prefix' = 'label" + UUID.randomUUID() + 
"'"
-                    + ")");
+    def run_cmd = "${javaPath} ${addOpens} -cp flink-doris-case.jar 
org.apache.doris.FlinkConnectorTypeCase $context.config.feHttpAddress 
regression_test_flink_connector_p0 $context.config.feHttpUser"
+    logger.info("run_cmd : $run_cmd")
+    def run_flink_jar = run_cmd.execute().getText()
+    logger.info("result: $run_flink_jar")
+    // The publish in the commit phase is asynchronous
+    Awaitility.await().atMost(30, SECONDS).pollInterval(1, 
SECONDS).await().until(
+            {
+                def resultTbl = sql """ select count(1) from test_types_sink"""
+                logger.info("retry test_types_sink  count: $resultTbl")
+                resultTbl.size() >= 1
+            })
 
-    TableResult tableResult = tEnv.executeSql("INSERT INTO doris_test_sink 
select * from source_doris");
-    tableResult.await();
     logger.info("flink job execute finished.");
     qt_select """ select * from test_types_sink order by id"""
 }
diff --git a/run-regression-test.sh b/run-regression-test.sh
index 16256152887..a026822ca18 100755
--- a/run-regression-test.sh
+++ b/run-regression-test.sh
@@ -229,4 +229,4 @@ if [[ "${ONLY_COMPILE}" -eq 0 ]]; then
         -jar ${RUN_JAR:+${RUN_JAR}} \
         -cf "${CONFIG_FILE}" \
         ${REGRESSION_OPTIONS_PREFIX:+${REGRESSION_OPTIONS_PREFIX}} "$@"
-fi
+fi
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to