This is an automated email from the ASF dual-hosted git repository.

achao pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new dda8730dd [Improve] flink on yarn e2e testcase (#3860)
dda8730dd is described below

commit dda8730dde542d00e3ed053d7dc4c3e053a17271
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 14 21:19:20 2024 +0800

    [Improve] flink on yarn e2e testcase (#3860)
    
    * [Improve] flink on yarn e2e testcase
    
    * [Improve] Try fix e2e ci failed
    
    * [Bug] ChildFirstClassloader loadClass bug fixed
    
    * [Improve] load commons-cli classes improvements
    
    * [Improve] flink-sql on yarn test-case classes name improvements
---
 .github/workflows/e2e.yml                          | 12 ++---
 ...linkSQLTest.java => FlinkSQL116OnYarnTest.java} | 14 +++---
 ...linkSQLTest.java => FlinkSQL117OnYarnTest.java} | 14 +++---
 ...linkSQLTest.java => FlinkSQL118OnYarnTest.java} | 18 +++----
 .../flink/proxy/ChildFirstClassLoader.scala        | 58 ++++++++++++++--------
 .../streampark/flink/proxy/FlinkShimsProxy.scala   | 23 +++------
 6 files changed, 73 insertions(+), 66 deletions(-)

diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index f908b05d5..755e7e275 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -124,12 +124,12 @@ jobs:
             class: org.apache.streampark.e2e.cases.TeamManagementTest
           - name: MemberManagementTest
             class: org.apache.streampark.e2e.cases.MemberManagementTest
-          - name: ApplicationsFlink116OnYarnWithFlinkSQLTest
-            class: 
org.apache.streampark.e2e.cases.ApplicationsFlink116OnYarnWithFlinkSQLTest
-          - name: ApplicationsFlink117OnYarnWithFlinkSQLTest
-            class: 
org.apache.streampark.e2e.cases.ApplicationsFlink117OnYarnWithFlinkSQLTest
-          - name: ApplicationsFlink118OnYarnWithFlinkSQLTest
-            class: 
org.apache.streampark.e2e.cases.ApplicationsFlink118OnYarnWithFlinkSQLTest
+          - name: FlinkSQL116OnYarnTest
+            class: org.apache.streampark.e2e.cases.FlinkSQL116OnYarnTest
+          - name: FlinkSQL117OnYarnTest
+            class: org.apache.streampark.e2e.cases.FlinkSQL117OnYarnTest
+          - name: FlinkSQL118OnYarnTest
+            class: org.apache.streampark.e2e.cases.FlinkSQL118OnYarnTest
     env:
       RECORDING_PATH: /tmp/recording-${{ matrix.case.name }}
     steps:
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
similarity index 98%
rename from 
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
rename to 
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
index 2b0427016..92d4c5dae 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink116OnYarnWithFlinkSQLTest.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL116OnYarnTest.java
@@ -38,7 +38,7 @@ import static 
org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 @StreamPark(composeFiles = "docker/flink-1.16-on-yarn/docker-compose.yaml")
-public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
+public class FlinkSQL116OnYarnTest {
 
     private static RemoteWebDriver browser;
 
@@ -106,8 +106,8 @@ public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("SUCCESS")));
     }
 
-    // @Test
-    // @Order(30)
+    @Test
+    @Order(30)
     void testStartFlinkApplicationOnYarnApplicationMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
@@ -128,8 +128,8 @@ public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("FINISHED")));
     }
 
-    // @Test
-    // @Order(31)
+    @Test
+    @Order(31)
     @SneakyThrows
     void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() {
         Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
@@ -210,8 +210,8 @@ public class ApplicationsFlink116OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("SUCCESS")));
     }
 
-    // @Test
-    // @Order(70)
+    @Test
+    @Order(70)
     void testStartFlinkApplicationOnYarnPerJobMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
similarity index 98%
rename from 
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
rename to 
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
index 9fb19880e..b98fae390 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink117OnYarnWithFlinkSQLTest.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL117OnYarnTest.java
@@ -38,7 +38,7 @@ import static 
org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 @StreamPark(composeFiles = "docker/flink-1.17-on-yarn/docker-compose.yaml")
-public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
+public class FlinkSQL117OnYarnTest {
 
     private static RemoteWebDriver browser;
 
@@ -106,8 +106,8 @@ public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("SUCCESS")));
     }
 
-    // @Test
-    // @Order(30)
+    @Test
+    @Order(30)
     void testStartFlinkApplicationOnYarnApplicationMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
@@ -128,8 +128,8 @@ public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("FINISHED")));
     }
 
-    // @Test
-    // @Order(31)
+    @Test
+    @Order(31)
     @SneakyThrows
     void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() {
         Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
@@ -211,8 +211,8 @@ public class ApplicationsFlink117OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("SUCCESS")));
     }
 
-    // @Test
-    // @Order(70)
+    @Test
+    @Order(70)
     void testStartFlinkApplicationOnYarnPerJobMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
diff --git 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnWithFlinkSQLTest.java
 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
similarity index 98%
rename from 
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnWithFlinkSQLTest.java
rename to 
streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
index c252fb089..a91da141a 100644
--- 
a/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/ApplicationsFlink118OnYarnWithFlinkSQLTest.java
+++ 
b/streampark-e2e/streampark-e2e-case/src/test/java/org/apache/streampark/e2e/cases/FlinkSQL118OnYarnTest.java
@@ -38,7 +38,7 @@ import static 
org.apache.streampark.e2e.core.Constants.TEST_FLINK_SQL;
 import static org.assertj.core.api.Assertions.assertThat;
 
 @StreamPark(composeFiles = "docker/flink-1.18-on-yarn/docker-compose.yaml")
-public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
+public class FlinkSQL118OnYarnTest {
 
     private static RemoteWebDriver browser;
 
@@ -106,8 +106,8 @@ public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("SUCCESS")));
     }
 
-    // @Test
-    // @Order(30)
+    @Test
+    @Order(30)
     void testStartFlinkApplicationOnYarnApplicationMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
@@ -128,8 +128,8 @@ public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("FINISHED")));
     }
 
-    // @Test
-    // @Order(31)
+    @Test
+    @Order(31)
     @SneakyThrows
     void testRestartAndCancelFlinkApplicationOnYarnApplicationMode() {
         Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
@@ -211,8 +211,8 @@ public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("SUCCESS")));
     }
 
-    // @Test
-    // @Order(70)
+    @Test
+    @Order(70)
     void testStartFlinkApplicationOnYarnPerJobMode() {
         final ApplicationsPage applicationsPage = new 
ApplicationsPage(browser);
 
@@ -233,8 +233,8 @@ public class ApplicationsFlink118OnYarnWithFlinkSQLTest {
                     .anyMatch(it -> it.contains("FINISHED")));
     }
 
-    // @Test
-    // @Order(71)
+    @Test
+    @Order(71)
     @SneakyThrows
     void testRestartAndCancelFlinkApplicationOnYarnPerJobMode() {
         Thread.sleep(Constants.DEFAULT_SLEEP_MILLISECONDS);
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
index 5e572cbaf..ec8f360b5 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
+++ 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
@@ -20,9 +20,9 @@ package org.apache.streampark.flink.proxy
 import java.io.{File, IOException}
 import java.net.{URL, URLClassLoader}
 import java.util
+import java.util.function.Consumer
 import java.util.regex.Pattern
 
-import scala.language.existentials
 import scala.util.Try
 
 /**
@@ -33,11 +33,25 @@ import scala.util.Try
  * we don't override that.
  */
 
-class ChildFirstClassLoader(urls: Array[URL], parent: ClassLoader, 
flinkResourcePattern: Pattern)
+class ChildFirstClassLoader(
+    urls: Array[URL],
+    parent: ClassLoader,
+    flinkResourcePattern: Pattern,
+    classLoadingExceptionHandler: Consumer[Throwable])
   extends URLClassLoader(urls, parent) {
 
   ClassLoader.registerAsParallelCapable()
 
+  def this(urls: Array[URL], parent: ClassLoader, flinkResourcePattern: 
Pattern) {
+    this(
+      urls,
+      parent,
+      flinkResourcePattern,
+      (t: Throwable) => throw t)
+  }
+
+  ClassLoader.registerAsParallelCapable()
+
   private val FLINK_PATTERN =
     Pattern.compile("flink-(.*).jar", Pattern.CASE_INSENSITIVE | 
Pattern.DOTALL)
 
@@ -50,6 +64,7 @@ class ChildFirstClassLoader(urls: Array[URL], parent: 
ClassLoader, flinkResource
     "org.apache.log4j",
     "org.apache.logging",
     "org.apache.commons.logging",
+    "org.apache.commons.cli",
     "ch.qos.logback",
     "org.xml",
     "org.w3c",
@@ -57,24 +72,27 @@ class ChildFirstClassLoader(urls: Array[URL], parent: 
ClassLoader, flinkResource
 
   @throws[ClassNotFoundException]
   override def loadClass(name: String, resolve: Boolean): Class[_] = {
-    this.synchronized {
-      // First, check if the class has already been loaded
-      val clazz = super.findLoadedClass(name) match {
-        case null =>
-          // check whether the class should go parent-first
-          for (parentFirstPattern <- PARENT_FIRST_PATTERNS) {
-            if (name.startsWith(parentFirstPattern)) {
-              super.loadClass(name, resolve)
+    try {
+      this.synchronized {
+        // First, check if the class has already been loaded
+        super.findLoadedClass(name) match {
+          case null =>
+            // check whether the class should go parent-first
+            PARENT_FIRST_PATTERNS.find(name.startsWith) match {
+              case Some(_) => super.loadClass(name, resolve)
+              case _ => Try(findClass(name)).getOrElse(super.loadClass(name, 
resolve))
+            }
+          case c =>
+            if (resolve) {
+              resolveClass(c)
             }
-          }
-          Try(findClass(name)).getOrElse(super.loadClass(name, resolve))
-        case c: Class[_] =>
-          if (resolve) {
-            resolveClass(c)
-          }
-          c
+            c
+        }
       }
-      clazz
+    } catch {
+      case e: Throwable =>
+        classLoadingExceptionHandler.accept(e)
+        null
     }
   }
 
@@ -99,9 +117,7 @@ class ChildFirstClassLoader(urls: Array[URL], parent: 
ClassLoader, flinkResource
       val spec = urlClassLoaderResource.getFile
       val filename = new File(spec.substring(0, spec.indexOf("!/"))).getName
       val matchState =
-        FLINK_PATTERN.matcher(filename).matches && !flinkResourcePattern
-          .matcher(filename)
-          .matches
+        FLINK_PATTERN.matcher(filename).matches && 
!flinkResourcePattern.matcher(filename).matches
       if (matchState) {
         return null
       }
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 051f53fdf..1d95303ac 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -33,13 +33,9 @@ object FlinkShimsProxy extends Logger {
 
   private[this] val SHIMS_CLASS_LOADER_CACHE = MutableMap[String, 
ClassLoader]()
 
-  private[this] val VERIFY_SQL_CLASS_LOADER_CACHE =
-    MutableMap[String, ClassLoader]()
+  private[this] val VERIFY_SQL_CLASS_LOADER_CACHE = MutableMap[String, 
ClassLoader]()
 
-  private[this] val INCLUDE_PATTERN: Pattern =
-    Pattern.compile(
-      "(streampark-shaded-jackson-)(.*).jar",
-      Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
+  private[this] val INCLUDE_PATTERN: Pattern = 
Pattern.compile("(streampark-shaded-jackson-)(.*).jar", 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
 
   private[this] def getFlinkShimsResourcePattern(majorVersion: String) =
     Pattern.compile(s"flink-(.*)-$majorVersion(.*).jar", 
Pattern.CASE_INSENSITIVE | Pattern.DOTALL)
@@ -58,8 +54,7 @@ object FlinkShimsProxy extends Logger {
    */
   def proxy[T](flinkVersion: FlinkVersion, func: ClassLoader => T): T = {
     val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
-    ClassLoaderUtils
-      .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+    ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => 
func(shimsClassLoader))
   }
 
   /**
@@ -74,8 +69,7 @@ object FlinkShimsProxy extends Logger {
    */
   def proxy[T](flinkVersion: FlinkVersion, func: JavaFunc[ClassLoader, T]): T 
= {
     val shimsClassLoader = getFlinkShimsClassLoader(flinkVersion)
-    ClassLoaderUtils
-      .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+    ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => 
func(shimsClassLoader))
   }
 
   // need to load all flink-table dependencies compatible with different 
versions
@@ -85,12 +79,10 @@ object FlinkShimsProxy extends Logger {
       s"${flinkVersion.fullVersion}", {
         val getFlinkTable: File => Boolean = 
_.getName.startsWith("flink-table")
         // 1) flink/lib/flink-table*
-        val libTableURL =
-          getFlinkHomeLib(flinkVersion.flinkHome, "lib", getFlinkTable)
+        val libTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "lib", 
getFlinkTable)
 
         // 2) After version 1.15 need add flink/opt/flink-table*
-        val optTableURL =
-          getFlinkHomeLib(flinkVersion.flinkHome, "opt", getFlinkTable)
+        val optTableURL = getFlinkHomeLib(flinkVersion.flinkHome, "opt", 
getFlinkTable)
         val shimsUrls = ListBuffer[URL](libTableURL ++ optTableURL: _*)
 
         // 3) add only streampark shims jar
@@ -158,8 +150,7 @@ object FlinkShimsProxy extends Logger {
    */
   def proxyVerifySql[T](flinkVersion: FlinkVersion, func: 
JavaFunc[ClassLoader, T]): T = {
     val shimsClassLoader = getVerifySqlLibClassLoader(flinkVersion)
-    ClassLoaderUtils
-      .runAsClassLoader[T](shimsClassLoader, () => func(shimsClassLoader))
+    ClassLoaderUtils.runAsClassLoader[T](shimsClassLoader, () => 
func(shimsClassLoader))
   }
 
   private[this] def getFlinkShimsClassLoader(flinkVersion: FlinkVersion): 
ClassLoader = {

Reply via email to