This is an automated email from the ASF dual-hosted git repository.
achao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new dda8730dd [Improve] flink on yarn e2e testcase (#3860)
dda8730dd is described below
commit dda8730dde542d00e3ed053d7dc4c3e053a17271
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 14 21:19:20 2024 +0800
[Improve] flink on yarn e2e testcase (#3860)
* [Improve] flink on yarn e2e testcase
* [Improve] Try fix e2e ci failed
* [Bug] ChildFirstClassloader loadClass bug fixed
* [Improve] load commons-cli classes improvements
* [Improve] flink-sql on yarn test-case classes name improvements
---
.github/workflows/e2e.yml | 12 ++---
...linkSQLTest.java => FlinkSQL116OnYarnTest.java} | 14 +++---
...linkSQLTest.java => FlinkSQL117OnYarnTest.java} | 14 +++---
...linkSQLTest.java => FlinkSQL118OnYarnTest.java} | 18 +++----
.../flink/proxy/ChildFirstClassLoader.scala | 58 ++++++++++++++--------
.../streampark/flink/proxy/FlinkShimsProxy.scala | 23 +++------
6 files changed, 73 insertions(+), 66 deletions(-)
diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index f908b05d5..755e7e275 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -124,12 +124,12 @@ jobs:
class: org.apache.streampark.e2e.cases.TeamManagementTest
- name: MemberManagementTest
class: org.apache.streampark.e2e.cases.MemberManagementTest
- - name: ApplicationsFlink116OnYarnWithFlinkSQLTest
- class:
org.apache.streampark.e2e.cases.ApplicationsFlink116OnYarnWithFlinkSQLTest
- - name: ApplicationsFlink117OnYarnWithFlinkSQLTest
- class:
org.apache.streampark.e2e.cases.ApplicationsFlink117OnYarnWithFlinkSQLTest
- - name: ApplicationsFlink118OnYarnWithFlinkSQLTest
- class:
org.apache.streampark.e2e.cases.ApplicationsFlink118OnYarnWithFlinkSQLTest
+ - name: FlinkSQL116OnYarnTest
+ class: org.apache.streampark.e2e.cases.FlinkSQL116OnYarnTest
+ - name: FlinkSQL117OnYarnTest
+ class: org.apache.streampark.e2e.cases.FlinkSQL117OnYarnTest
+ - name: FlinkSQL118OnYarnTest
+ class: org.apache.streampark.e2e.cases.FlinkSQL118OnYarnTest
env:
RECORDING_PATH: /tmp/recording-${{ matrix.case.name }}
steps:
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
similarity index 98%
rename from
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
rename to
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
index 2b0427016..92d4c5dae 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
@@ -38,7 +38,7 @@ import static
org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
import static org.assertj.core.api.Assertions.assertThat;
@StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml")
-public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
+public class FlinkSQL116OnYarnTest {
private static RemoteWebDriver browser;
@@ -106,8 +106,8 @@ public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("SUCCESS")));
}
- // @Test
- // @Order(30)
+ @Test
+ @Order(30)
void testStartFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
@@ -128,8 +128,8 @@ public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("FINISHED")));
}
- // @Test
- // @Order(31)
+ @Test
+ @Order(31)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
@@ -210,8 +210,8 @@ public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("SUCCESS")));
}
- // @Test
- // @Order(70)
+ @Test
+ @Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
similarity index 98%
rename from
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
rename to
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
index 9fb19880e..b98fae390 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
@@ -38,7 +38,7 @@ import static
org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
import static org.assertj.core.api.Assertions.assertThat;
@StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml")
-public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
+public class FlinkSQL117OnYarnTest {
private static RemoteWebDriver browser;
@@ -106,8 +106,8 @@ public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("SUCCESS")));
}
- // @Test
- // @Order(30)
+ @Test
+ @Order(30)
void testStartFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
@@ -128,8 +128,8 @@ public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("FINISHED")));
}
- // @Test
- // @Order(31)
+ @Test
+ @Order(31)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
@@ -211,8 +211,8 @@ public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("SUCCESS")));
}
- // @Test
- // @Order(70)
+ @Test
+ @Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
diff --git
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnWithFlinkSQLTest.java
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
similarity index 98%
rename from
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnWithFlinkSQLTest.java
rename to
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
index c252fb089..a91da141a 100644
---
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnWithFlinkSQLTest.java
+++
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
@@ -38,7 +38,7 @@ import static
org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
import static org.assertj.core.api.Assertions.assertThat;
@StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml")
-public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
+public class FlinkSQL118OnYarnTest {
private static RemoteWebDriver browser;
@@ -106,8 +106,8 @@ public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("SUCCESS")));
}
- // @Test
- // @Order(30)
+ @Test
+ @Order(30)
void testStartFlinkApplicationOnYarnApplicationMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
@@ -128,8 +128,8 @@ public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("FINISHED")));
}
- // @Test
- // @Order(31)
+ @Test
+ @Order(31)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
@@ -211,8 +211,8 @@ public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("SUCCESS")));
}
- // @Test
- // @Order(70)
+ @Test
+ @Order(70)
void testStartFlinkApplicationOnYarnPerJobMode() {
final ApplicationsPage applicationsPage = new
ApplicationsPage(browser);
@@ -233,8 +233,8 @@ public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
.anyMatch(it -> it.contains("FINISHED")));
}
- // @Test
- // @Order(71)
+ @Test
+ @Order(71)
@SneakyThrows
void testRestartAndCancelFlinkApplicationOnYarnPerJobMode() {
Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
index 5e572cbaf..ec8f360b5 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
@@ -20,9 +20,9 @@ package org.apache.streampark.flink.proxy
import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.util
+import java.util.function.Consumer
import java.util.regex.Pattern
-import scala.language.existentials
import scala.util.Try
/**
@@ -33,11 +33,25 @@ import scala.util.Try
* we don't override that.
*/
-class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader,
flinkResourcePattern: Pattern)
+class ChildFirstClassLoader(
+ urls: Array[URL],
+ parent: ClassLoader,
+ flinkResourcePattern: Pattern,
+ classLoadingExceptionHandler: Consumer[Throwable])
extends URLClassLoader(urls, parent) {
ClassLoader.registerAsParallelCapable()
+ def this(urls: Array[URL], parent: ClassLoader, flinkResourcePattern:
Pattern) {
+ this(
+ urls,
+ parent,
+ flinkResourcePattern,
+ (t: Throwable) => throw t)
+ }
+
+ ClassLoader.registerAsParallelCapable()
+
private val FLINK_PATTERN =
Pattern.compile("flink-(.*).jar", Pattern.CASE_INSENSITIVE |
Pattern.DOTALL)
@@ -50,6 +64,7 @@ class ChildFirstClassLoader(urls: Array[URL], parent:
ClassLoader, flinkResource
"org.apache.log4j",
"org.apache.logging",
"org.apache.commons.logging",
+ "org.apache.commons.cli",
"ch.qos.logback",
"org.xml",
"org.w3c",
@@ -57,24 +72,27 @@ class ChildFirstClassLoader(urls: Array[URL], parent:
ClassLoader, flinkResource
@throws[ClassNotFoundException]
override def loadClass(name: String, resolve: Boolean): Class[_] = {
- this.synchronized {
- // First, check if the class has already been loaded
- val clazz = super.findLoadedClass(name) match {
- case null =>
- // check whether the class should go parent-first
- for (parentFirstPattern <- PARENT_FIRST_PATTERNS) {
- if (name.startsWith(parentFirstPattern)) {
- super.loadClass(name, resolve)
+ try {
+ this.synchronized {
+ // First, check if the class has already been loaded
+ super.findLoadedClass(name) match {
+ case null =>
+ // check whether the class should go parent-first
+ PARENT_FIRST_PATTERNS.find(name.startsWith) match {
+ case Some(_) => super.loadClass(name, resolve)
+ case _ => Try(findClass(name)).getOrElse(super.loadClass(name,
resolve))
+ }
+ case c =>
+ if (resolve) {
+ resolveClass(c)
}
- }
- Try(findClass(name)).getOrElse(super.loadClass(name, resolve))
- case c: Class[_] =>
- if (resolve) {
- resolveClass(c)
- }
- c
+ c
+ }
}
- clazz
+ } catch {
+ case e: Throwable =>
+ classLoadingExceptionHandler.accept(e)
+ null
}
}
@@ -99,9 +117,7 @@ class ChildFirstClassLoader(urls: Array[URL], parent:
ClassLoader, flinkResource
val spec = urlClassLoaderResource.getFile
val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
val matchState =
- FLINK_PATTERN.matcher(filename).matches && !flinkResourcePattern
- .matcher(filename)
- .matches
+ FLINK_PATTERN.matcher(filename).matches &&
!flinkResourcePattern.matcher(filename).matches
if (matchState) {
return null
}
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 051f53fdf..1d95303ac 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -33,13 +33,9 @@ object FlinkShimsProxy extends Logger {
private[this] val SHIMS_CLASS_LOADER_CACHE = MutableMap[String,
ClassLoader]()
- private[this] val VERIFY_SQL_CLASS_LOADER_CACHE =
- MutableMap[String, ClassLoader]()
+ private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String,
ClassLoader]()
- private[this] val INCLUDE_PATTERN: Pattern =
- Pattern.compile(
- "(streampark-shaded-jackson-)(.*).jar",
- Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
+ private[this] val INCLUDE_PATTERN: Pattern =
Pattern.compile("(streampark-shaded-jackson-)(.*).jar",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
private[this] def getFlinkShimsResourcePattern(majorVersion: String) =
Pattern.compile(s"flink-(.*)-$majorVersion(.*).jar",
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
@@ -58,8 +54,7 @@ object FlinkShimsProxy extends Logger {
*/
def proxy[T](flinkVersion: FlinkVersion, func: ClassLoader => T): T = {
val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
- ClassLoaderUtils
- .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+ ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () =>
func(shimsClassLoader))
}
/**
@@ -74,8 +69,7 @@ object FlinkShimsProxy extends Logger {
*/
def proxy[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T
= {
val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
- ClassLoaderUtils
- .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+ ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () =>
func(shimsClassLoader))
}
// need to load all flink-table dependencies compatible with different
versions
@@ -85,12 +79,10 @@ object FlinkShimsProxy extends Logger {
s"${flinkVersion.fullVersion}", {
val getFlinkTable: File => Boolean =
_.getName.startsWith("flink-table")
// 1) flink/lib/flink-table*
- val libTableURL =
- getFlinkHomeLib(flinkVersion.flinkHome, "lib", getFlinkTable)
+ val libTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib",
getFlinkTable)
// 2) After version 1.15 need add flink/opt/flink-table*
- val optTableURL =
- getFlinkHomeLib(flinkVersion.flinkHome, "opt", getFlinkTable)
+ val optTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "opt",
getFlinkTable)
val shimsUrls = ListBuffer[URL](libTableURL ++ optTableURL: _*)
// 3) add only streampark shims jar
@@ -158,8 +150,7 @@ object FlinkShimsProxy extends Logger {
*/
def proxyVerifySql[T](flinkVersion: FlinkVersion, func:
JavaFunc[ClassLoader, T]): T = {
val shimsClassLoader = getVerifySqlLibClassLoader(flinkVersion)
- ClassLoaderUtils
- .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+ ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () =>
func(shimsClassLoader))
}
private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion):
ClassLoader = {