This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 77c806841 [Improve] parse flink version improvement (#3876)
77c806841 is described below

commit 77c806841e5eed5c36f68a095c3c20441582eb87
Author: benjobs <[email protected]>
AuthorDate: Wed Jul 17 14:10:47 2024 +0800

    [Improve] parse flink version improvement (#3876)
    
    * [Improve] parse flink version improvement
    
    * [Improve] minor improvement
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../streampark/common/conf/FlinkVersion.scala      | 31 +++++++++++++++-------
 1 file changed, 21 insertions(+), 10 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index ca77c2343..6ba10923c 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -21,7 +21,6 @@ import org.apache.streampark.common.util.{CommandUtils, 
Logger}
 
 import java.io.File
 import java.net.{URL => NetURL}
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 import java.util.function.Consumer
 import java.util.regex.Pattern
 
@@ -36,12 +35,16 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
   private[this] lazy val FLINK_VERSION_PATTERN = Pattern.compile("^Version: 
(.*), Commit ID: (.*)$")
 
   private[this] lazy val FLINK_SCALA_VERSION_PATTERN =
-    Pattern.compile("^flink-dist_(.*)-[0-9].*.jar$")
+    Pattern.compile("^flink-dist_(\\d\\.\\d+).*.jar$")
+
+  private[this] lazy val APACHE_FLINK_VERSION_PATTERN = 
Pattern.compile("(^\\d+\\.\\d+\\.\\d+)")
+
+  private[this] lazy val OTHER_FLINK_VERSION_PATTERN = 
Pattern.compile("(\\d+\\.\\d+)(-*)")
 
   lazy val scalaVersion: String = {
     val matcher = FLINK_SCALA_VERSION_PATTERN.matcher(flinkDistJar.getName)
     if (matcher.matches()) {
-      matcher.group(1);
+      matcher.group(1)
     } else {
       // flink 1.15 + on support scala 2.12
       "2.12"
@@ -63,10 +66,9 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
   lazy val flinkLibs: List[NetURL] = 
flinkLib.listFiles().map(_.toURI.toURL).toList
 
   lazy val version: String = {
-    val flinkVersion = new AtomicReference[String]
     val cmd = List(
-      s"java -classpath ${flinkDistJar.getAbsolutePath} 
org.apache.flink.client.cli.CliFrontend --version")
-    val success = new AtomicBoolean(false)
+      s"java -classpath ${flinkDistJar.getName} 
org.apache.flink.client.cli.CliFrontend --version")
+    var flinkVersion: String = null
     val buffer = new mutable.StringBuilder
     CommandUtils.execute(
       flinkLib.getAbsolutePath,
@@ -76,17 +78,26 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
           buffer.append(out).append("\n")
           val matcher = FLINK_VERSION_PATTERN.matcher(out)
           if (matcher.find) {
-            success.set(true)
-            flinkVersion.set(matcher.group(1))
+            val version = matcher.group(1)
+            val matcher1 = APACHE_FLINK_VERSION_PATTERN.matcher(version)
+            if (matcher1.find) {
+              flinkVersion = version
+            } else {
+              val matcher2 = OTHER_FLINK_VERSION_PATTERN.matcher(version)
+              if (matcher2.find) {
+                flinkVersion = version
+              }
+            }
           }
         }
       })
+
     logInfo(buffer.toString())
-    if (!success.get()) {
+    if (flinkVersion == null) {
       throw new IllegalStateException(s"[StreamPark] parse flink version 
failed. $buffer")
     }
     buffer.clear()
-    flinkVersion.get
+    flinkVersion
   }
 
   // flink major version, like "1.13", "1.14"

Reply via email to