This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch docker
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/docker by this push:
new 3b011684b [Improve] vvr flink version parse bug fixed.
3b011684b is described below
commit 3b011684bde30d7983f38ede55544cc41ac9d637
Author: benjobs <[email protected]>
AuthorDate: Mon Jul 15 22:10:30 2024 +0800
[Improve] vvr flink version parse bug fixed.
---
.../streampark/common/conf/FlinkVersion.scala | 26 +++++++++++++++++-----
1 file changed, 20 insertions(+), 6 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index 6dec3b29b..2b4ab6a9c 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -32,12 +32,15 @@ class FlinkVersion(val flinkHome: String) extends
java.io.Serializable with Logg
private[this] lazy val FLINK_VER_PATTERN =
Pattern.compile("^(\\d+\\.\\d+)(\\.)?.*$")
- private[this] lazy val FLINK_VERSION_PATTERN =
- Pattern.compile("^Version: (\\d+\\.\\d+\\.\\d+).*, Commit ID: (.*)$")
+ private[this] lazy val FLINK_VERSION_PATTERN = Pattern.compile("^Version:
(.*), Commit ID: (.*)$")
private[this] lazy val FLINK_SCALA_VERSION_PATTERN =
Pattern.compile("^flink-dist_(\\d\\.\\d+).*.jar$")
+ private[this] lazy val APACHE_FLINK_VERSION_PATTERN =
Pattern.compile("(^\\d+\\.\\d+\\.\\d+)")
+
+ private[this] lazy val OTHER_FLINK_VERSION_PATTERN =
Pattern.compile("(\\d+\\.\\d+)(-*)")
+
lazy val scalaVersion: String = {
val matcher = FLINK_SCALA_VERSION_PATTERN.matcher(flinkDistJar.getName)
if (matcher.matches()) {
@@ -64,8 +67,9 @@ class FlinkVersion(val flinkHome: String) extends
java.io.Serializable with Logg
lazy val version: String = {
val flinkVersion = new AtomicReference[String]
+
val cmd = List(
- s"java -classpath ${flinkDistJar.getAbsolutePath}
org.apache.flink.client.cli.CliFrontend --version")
+ s"java -classpath ${flinkDistJar.getName}
org.apache.flink.client.cli.CliFrontend --version")
val success = new AtomicBoolean(false)
val buffer = new mutable.StringBuilder
CommandUtils.execute(
@@ -76,8 +80,18 @@ class FlinkVersion(val flinkHome: String) extends
java.io.Serializable with Logg
buffer.append(out).append("\n")
val matcher = FLINK_VERSION_PATTERN.matcher(out)
if (matcher.find) {
- success.set(true)
- flinkVersion.set(matcher.group(1))
+ val version = matcher.group(1)
+ val matcher1 = APACHE_FLINK_VERSION_PATTERN.matcher(version)
+ if (matcher1.find) {
+ success.set(true)
+ flinkVersion.set(version)
+ } else {
+ val matcher2 = OTHER_FLINK_VERSION_PATTERN.matcher(version)
+ if (matcher2.find) {
+ success.set(true)
+ flinkVersion.set(s"${matcher2.group(1)}.0")
+ }
+ }
}
}
}
@@ -127,7 +141,7 @@ class FlinkVersion(val flinkHome: String) extends
java.io.Serializable with Logg
}
// StreamPark flink shims version, like "streampark-flink-shims_flink-1.13"
- lazy val shimsVersion: String = s"streampark-flink-shims_flink-$majorVersion"
+ private lazy val shimsVersion: String =
s"streampark-flink-shims_flink-$majorVersion"
override def toString: String =
s"""