This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 7a99081b2 [Improve] vvr flink parse version bug fixed. (#3866)
7a99081b2 is described below

commit 7a99081b2873ab62a4b327174fcf85556be64bd7
Author: benjobs <[email protected]>
AuthorDate: Mon Jul 15 23:14:12 2024 +0800

    [Improve] vvr flink parse version bug fixed. (#3866)
    
    * [Improve] vvr flink version parse bug fixed.
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../streampark/common/conf/FlinkVersion.scala      | 26 +++++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index 6dec3b29b..2b4ab6a9c 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -32,12 +32,15 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
 
   private[this] lazy val FLINK_VER_PATTERN = 
Pattern.compile("^(\\d+\\.\\d+)(\\.)?.*$")
 
-  private[this] lazy val FLINK_VERSION_PATTERN =
-    Pattern.compile("^Version: (\\d+\\.\\d+\\.\\d+).*, Commit ID: (.*)$")
+  private[this] lazy val FLINK_VERSION_PATTERN = Pattern.compile("^Version: 
(.*), Commit ID: (.*)$")
 
   private[this] lazy val FLINK_SCALA_VERSION_PATTERN =
     Pattern.compile("^flink-dist_(\\d\\.\\d+).*.jar$")
 
+  private[this] lazy val APACHE_FLINK_VERSION_PATTERN = 
Pattern.compile("(^\\d+\\.\\d+\\.\\d+)")
+
+  private[this] lazy val OTHER_FLINK_VERSION_PATTERN = 
Pattern.compile("(\\d+\\.\\d+)(-*)")
+
   lazy val scalaVersion: String = {
     val matcher = FLINK_SCALA_VERSION_PATTERN.matcher(flinkDistJar.getName)
     if (matcher.matches()) {
@@ -64,8 +67,9 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
 
   lazy val version: String = {
     val flinkVersion = new AtomicReference[String]
+
     val cmd = List(
-      s"java -classpath ${flinkDistJar.getAbsolutePath} 
org.apache.flink.client.cli.CliFrontend --version")
+      s"java -classpath ${flinkDistJar.getName} 
org.apache.flink.client.cli.CliFrontend --version")
     val success = new AtomicBoolean(false)
     val buffer = new mutable.StringBuilder
     CommandUtils.execute(
@@ -76,8 +80,18 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
           buffer.append(out).append("\n")
           val matcher = FLINK_VERSION_PATTERN.matcher(out)
           if (matcher.find) {
-            success.set(true)
-            flinkVersion.set(matcher.group(1))
+            val version = matcher.group(1)
+            val matcher1 = APACHE_FLINK_VERSION_PATTERN.matcher(version)
+            if (matcher1.find) {
+              success.set(true)
+              flinkVersion.set(version)
+            } else {
+              val matcher2 = OTHER_FLINK_VERSION_PATTERN.matcher(version)
+              if (matcher2.find) {
+                success.set(true)
+                flinkVersion.set(s"${matcher2.group(1)}.0")
+              }
+            }
           }
         }
       }
@@ -127,7 +141,7 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
   }
 
   // StreamPark flink shims version, like "streampark-flink-shims_flink-1.13"
-  lazy val shimsVersion: String = s"streampark-flink-shims_flink-$majorVersion"
+  private lazy val shimsVersion: String = 
s"streampark-flink-shims_flink-$majorVersion"
 
   override def toString: String =
     s"""

Reply via email to