This is an automated email from the ASF dual-hosted git repository.
muchunjin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 78cffdf4a yarn resourceManager URL bug fix (#2798)
78cffdf4a is described below
commit 78cffdf4a35869685049149a8233e23baaf8eee5
Author: benjobs <[email protected]>
AuthorDate: Sun Jun 18 09:26:59 2023 +0800
yarn resourceManager URL bug fix (#2798)
Co-authored-by: benjobs <[email protected]>
---
.../streampark/common/conf/FlinkVersion.scala | 2 +-
.../common/conf/InternalConfigHolder.scala | 2 +-
.../streampark/common/util/CURLBuilder.scala | 2 +-
.../streampark/common/util/CommandUtils.scala | 2 +-
.../streampark/common/util/ConfigUtils.scala | 2 +-
.../apache/streampark/common/util/FileUtils.scala | 2 +-
.../streampark/common/util/HBaseClient.scala | 2 +-
.../streampark/common/util/HadoopConfigUtils.scala | 2 +-
.../streampark/common/util/HadoopUtils.scala | 2 +-
.../apache/streampark/common/util/HostsUtils.scala | 2 +-
.../streampark/common/util/HttpClientUtils.scala | 2 +-
.../apache/streampark/common/util/JdbcUtils.scala | 2 +-
.../streampark/common/util/MongoConfig.scala | 2 +-
.../streampark/common/util/PropertiesUtils.scala | 2 +-
.../streampark/common/util/RedisClient.scala | 2 +-
.../streampark/common/util/RedisEndpoint.scala | 2 +-
.../apache/streampark/common/util/RedisUtils.scala | 2 +-
.../streampark/common/util/SqlConvertUtils.scala | 2 +-
.../org/apache/streampark/common/util/Utils.scala | 2 +-
.../apache/streampark/common/util/YarnUtils.scala | 26 ++++++++++------------
.../streampark/common/util/ZooKeeperUtils.scala | 2 +-
.../console/core/bean/AlertTemplate.java | 2 +-
.../console/core/entity/FlinkCluster.java | 2 +-
.../core/service/impl/FlinkClusterServiceImpl.java | 3 ++-
.../core/service/alert/AlertServiceTest.java | 2 +-
.../flink/client/bean/SubmitRequest.scala | 2 +-
.../impl/KubernetesNativeSessionClient.scala | 2 +-
.../flink/client/impl/YarnPerJobClient.scala | 2 +-
.../flink/client/impl/YarnSessionClient.scala | 2 +-
.../flink/client/trait/FlinkClientTrait.scala | 2 +-
.../flink/client/test/YarnPerJobTestCase.scala | 2 +-
.../flink/connector/conf/ThresholdConf.scala | 2 +-
.../flink/connector/failover/FailoverChecker.scala | 2 +-
.../flink/connector/failover/FailoverWriter.scala | 2 +-
.../flink/connector/failover/SinkBuffer.scala | 2 +-
.../flink/connector/failover/SinkRequest.scala | 2 +-
.../clickhouse/conf/ClickHouseHttpConfig.scala | 2 +-
.../internal/ClickHouseSinkFunction.scala | 2 +-
.../clickhouse/internal/ClickHouseSinkWriter.scala | 2 +-
.../clickhouse/internal/ClickHouseWriterTask.scala | 2 +-
.../connector/doris/conf/DorisConfig.scala | 2 +-
.../connector/elasticsearch5/sink/ES5Sink.scala | 2 +-
.../connector/elasticsearch6/sink/ES6Sink.scala | 2 +-
.../connector/elasticsearch7/sink/ES7Sink.scala | 2 +-
.../hbase/internal/HBaseSinkFunction.scala | 2 +-
.../hbase/internal/HBaseSourceFunction.scala | 2 +-
.../connector/hbase/request/HBaseRequest.scala | 2 +-
.../connector/http/internal/HttpSinkWriter.scala | 2 +-
.../connector/http/internal/HttpWriterTask.scala | 2 +-
.../flink/connector/http/sink/HttpSink.scala | 2 +-
.../connector/influx/function/InfluxFunction.scala | 2 +-
.../jdbc/internal/JdbcSourceFunction.scala | 2 +-
.../flink/connector/kafka/source/KafkaSource.scala | 2 +-
.../mongo/internal/MongoSourceFunction.scala | 2 +-
.../flink/connector/redis/sink/RedisSink.scala | 2 +-
.../flink/kubernetes/FlinkK8sWatchController.scala | 2 +-
.../helper/KubernetesDeploymentHelper.scala | 2 +-
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 2 +-
.../flink/kubernetes/FlinkRestJsonTest.scala | 2 +-
.../flink/packer/maven/DependencyInfo.scala | 2 +-
.../streampark/flink/packer/maven/MavenTool.scala | 2 +-
.../flink/core/FlinkTableInitializer.scala | 2 +-
.../flink/core/conf/FlinkRunOption.scala | 2 +-
.../spark/connector/kafka/offset/HBaseOffset.scala | 2 +-
.../spark/connector/kafka/offset/RedisOffset.scala | 2 +-
65 files changed, 77 insertions(+), 78 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index 5c23b4772..6585f7e56 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.{AtomicBoolean,
AtomicReference}
import java.util.function.Consumer
import java.util.regex.Pattern
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
/** @param flinkHome actual flink home that must be a readable local path */
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
index e06a900a3..73c23a4db 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
@@ -25,7 +25,7 @@ import javax.annotation.{Nonnull, Nullable}
import java.util
import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.language.postfixOps
/**
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala
index ef8bde57d..da4d7fd53 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala
@@ -18,7 +18,7 @@ package org.apache.streampark.common.util
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
class CURLBuilder(val url: String) {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/CommandUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/CommandUtils.scala
index f0a133430..aded4397d 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/CommandUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/CommandUtils.scala
@@ -21,7 +21,7 @@ import java.lang.{Iterable => JavaIterable}
import java.util.Scanner
import java.util.function.Consumer
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}
object CommandUtils extends Logger {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
index 8e6dcf671..e0c159299 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
@@ -20,7 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst._
import java.util.{Map => JavaMap, Properties}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.immutable.{Map => ScalaMap}
import scala.util.Try
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 708943098..b2be8dca2 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -21,7 +21,7 @@ import java.net.URL
import java.util
import java.util.Scanner
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
object FileUtils {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
index e2b36fecc..f128f252b 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation
import java.util.Properties
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
class HBaseClient(func: () => Connection) extends Serializable {
lazy val connection: Connection = func()
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
index 36936e54f..7fa179177 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
@@ -24,8 +24,8 @@ import java.io.File
import java.nio.charset.StandardCharsets
import java.util.{Collections, Map => JavaMap, Optional}
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.immutable.ListMap
import scala.util.{Failure, Success, Try}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index aca0f75a8..fe104c295 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -39,7 +39,7 @@ import java.util
import java.util.{Timer, TimerTask}
import java.util.concurrent._
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}
object HadoopUtils extends Logger {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HostsUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HostsUtils.scala
index 871165c07..65f2ff909 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HostsUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HostsUtils.scala
@@ -22,7 +22,7 @@ import io.netty.resolver.HostsFileParser
import java.net.InetAddress
import java.util.{Map => JavaMap}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.immutable.ListMap
object HostsUtils {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
index 34247a510..a2fa6f644 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala
@@ -33,7 +33,7 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.security.Principal
import java.util.{List => JavaList, Map => JavaMap}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
object HttpClientUtils {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
index 2186b8e8d..6029d88e9 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
@@ -25,7 +25,7 @@ import java.util.Properties
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.locks.ReentrantLock
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
index bfbdcbb2d..e04c97d13 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
@@ -22,7 +22,7 @@ import com.mongodb._
import java.util.Properties
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
object MongoConfig {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index 75ab3ceff..87440079b 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -27,8 +27,8 @@ import java.util.{HashMap => JavaMap, LinkedHashMap =>
JavaLinkedMap, Properties
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
import scala.collection.mutable.{Map => MutableMap}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
index 89dbd6063..2b2a4420f 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.annotation.meta.getter
import scala.annotation.tailrec
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.Random
object RedisClient extends Logger {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
index b9758beaf..ab765e691 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
@@ -25,7 +25,7 @@ import redis.clients.jedis.util.{JedisClusterCRC16,
JedisURIHelper, SafeEncoder}
import java.net.URI
import java.util.Properties
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
/**
* RedisEndpoint represents a redis connection endpoint info: host, port, auth
password db number,
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisUtils.scala
index 477f3cffb..b96f06b6d 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisUtils.scala
@@ -21,8 +21,8 @@ import redis.clients.jedis.{Jedis, JedisCluster, Pipeline,
ScanParams}
import java.lang.{Integer => JInt}
import java.util.Set
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.immutable
import scala.util.{Failure, Success, Try}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
index 0c315ac98..ba38150c2 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
@@ -21,7 +21,7 @@ import java.util.Scanner
import java.util.regex.Pattern
import scala.annotation.tailrec
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
object SqlConvertUtils extends Logger {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index 8b6ce53ab..98ec6ce6c 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -24,7 +24,7 @@ import java.net.URL
import java.util.{jar, Collection => JavaCollection, Map => JavaMap,
Properties, UUID}
import java.util.jar.{JarFile, JarInputStream}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}
object Utils {
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 2ffc244dc..cdf973452 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -33,7 +33,7 @@ import java.util
import java.util.{HashMap => JavaHashMap, List => JavaList}
import java.util.concurrent.TimeUnit
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ArrayBuffer
import scala.util.{Failure, Success, Try}
import scala.util.control.Breaks.{break, breakable}
@@ -120,15 +120,8 @@ object YarnUtils extends Logger {
if (StringUtils.isNotBlank(PROXY_YARN_URL)) PROXY_YARN_URL else
getRMWebAppURL()
}
- /**
- * <pre>
- *
- * @return
- * </pre>
- */
- def getRMWebAppURL(): String = {
-
- if (rmHttpURL == null) {
+ def getRMWebAppURL(getLatest: Boolean = false): String = {
+ if (rmHttpURL == null || getLatest) {
synchronized {
val conf = HadoopUtils.hadoopConf
val useHttps = YarnConfiguration.useHttps(conf)
@@ -249,7 +242,6 @@ object YarnUtils extends Logger {
* @return
*/
def restRequest(url: String): String = {
- if (url == null) return null
def request(reqUrl: String): String = {
logDebug("request url is " + reqUrl)
@@ -280,10 +272,16 @@ object YarnUtils extends Logger {
}
}
- if (url.startsWith("http://") || url.startsWith("https://")) request(url)
- else {
- request(s"${getRMWebAppURL()}/$url")
+ url match {
+ case u if u.matches("^http(|s)://.*") => request(url)
+ case _ =>
+ val resp = request(s"${getRMWebAppURL()}/$url")
+ if (resp != null) resp;
+ else {
+ request(s"${getRMWebAppURL(true)}/$url")
+ }
}
+
}
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
index b8cd19c28..1d3fc5868 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala
@@ -23,7 +23,7 @@ import org.apache.zookeeper.CreateMode
import java.nio.charset.StandardCharsets
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
object ZooKeeperUtils {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
index 774dd6a3e..fa6487707 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java
@@ -60,7 +60,7 @@ public class AlertTemplate implements Serializable {
if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
String format = "%s/proxy/%s/";
- String url = String.format(format, YarnUtils.getRMWebAppURL(),
application.getAppId());
+ String url = String.format(format, YarnUtils.getRMWebAppURL(false),
application.getAppId());
template.setLink(url);
} else {
template.setLink(null);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 62cedea94..7bfa52e72 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -178,7 +178,7 @@ public class FlinkCluster implements Serializable {
}
if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) {
try {
- String restUrl = YarnUtils.getRMWebAppURL() + "/proxy/" +
this.clusterId + "/overview";
+ String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" +
this.clusterId + "/overview";
String result =
HttpClientUtils.httpGetRequest(
restUrl,
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 80e527494..cb7e4ce76 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -163,7 +163,8 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
deployResponse,
"Deploy cluster failed, unknown reason,please check you params or
StreamPark error log");
if
(ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
- String address = YarnUtils.getRMWebAppURL() + "/proxy/" +
deployResponse.clusterId() + "/";
+ String address =
+ YarnUtils.getRMWebAppURL(true) + "/proxy/" +
deployResponse.clusterId() + "/";
flinkCluster.setAddress(address);
flinkCluster.setJobManagerUrl(deployResponse.address());
} else {
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
index d85ef7cbc..233bdb242 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java
@@ -205,7 +205,7 @@ class AlertServiceTest {
duration = application.getEndTime().getTime() -
application.getStartTime().getTime();
}
String format = "%s/proxy/%s/";
- String url = String.format(format, YarnUtils.getRMWebAppURL(),
application.getAppId());
+ String url = String.format(format, YarnUtils.getRMWebAppURL(false),
application.getAppId());
AlertTemplate template = new AlertTemplate();
template.setJobName(application.getJobName());
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 2464e26f1..8f29a4cdf 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -33,7 +33,7 @@ import javax.annotation.Nullable
import java.io.File
import java.util.{Map => JavaMap}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.Try
/**
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
index 346a8f1c6..d609b5e5b 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala
@@ -38,7 +38,7 @@ import
org.apache.flink.kubernetes.kubeclient.{FlinkKubeClient, FlinkKubeClientF
import java.io.File
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 79813adf2..1c2bb0394 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId
import java.io.File
import java.lang.{Boolean => JavaBool}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
/** yarn PerJob mode submit */
object YarnPerJobClient extends YarnClientTrait {
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index bbcd0cdaf..6ad07198b 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -35,8 +35,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils
import java.util
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ListBuffer
/** Submit Job to YARN Session Cluster */
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 64c67098e..670d323da 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -43,8 +43,8 @@ import java.io.File
import java.util.{Collections, List => JavaList, Map => JavaMap}
import scala.annotation.tailrec
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.language.postfixOps
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
index 6451d5908..0f8c11c32 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala
@@ -40,8 +40,8 @@ import java.io.File
import java.lang.reflect.Method
import java.util
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
/** perJob to submit jobs programmatically, */
object YarnPerJobTestCase extends Logger {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/conf/ThresholdConf.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/conf/ThresholdConf.scala
index 529fc9d20..dc3300f5c 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/conf/ThresholdConf.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/conf/ThresholdConf.scala
@@ -22,8 +22,8 @@ import
org.apache.streampark.flink.connector.conf.FailoverStorageType.{Console,
import java.util.Properties
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
case class ThresholdConf(prefixStr: String, parameters: Properties) {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverChecker.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverChecker.scala
index 11a30f5f7..7a05b99c2 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverChecker.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverChecker.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.util.{Logger, ThreadUtils}
import java.util.concurrent.{Executors, ScheduledExecutorService,
ThreadFactory, TimeUnit}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ListBuffer
case class FailoverChecker(delayTime: Long) extends AutoCloseable with Logger {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala
index 515583929..1ef414dde 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.clients.producer.{Callback,
KafkaProducer, ProducerRecor
import java.util._
import java.util.concurrent.locks.ReentrantLock
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
class FailoverWriter(failoverStorage: FailoverStorageType, properties:
Properties)
extends AutoCloseable
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkBuffer.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkBuffer.scala
index 531f5c824..8cb9899c0 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkBuffer.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkBuffer.scala
@@ -23,7 +23,7 @@ import java.util
import java.util.Collections
import java.util.concurrent.CopyOnWriteArrayList
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
case class SinkBuffer(writer: SinkWriter, flushInterval: Long, bufferSize: Int)
extends AutoCloseable
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
index c1b8b7a7f..37f94b013 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.failover
import java.util
import java.util.regex.Pattern
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
case class SinkRequest(records: util.List[String], var attemptCounter: Int =
0) {
def incrementCounter(): Unit = attemptCounter += 1
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
index 41d8480ce..c75eb1790 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
@@ -23,7 +23,7 @@ import
org.apache.streampark.flink.connector.conf.ThresholdConf
import java.util.{Base64, Properties}
import java.util.concurrent.ThreadLocalRandom
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
/**
* Flink sink for Clickhouse database. Powered by Async Http Client.
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala
index 038f2dff6..707b14737 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala
@@ -34,7 +34,7 @@ import java.util
import java.util.Properties
import java.util.concurrent.atomic.AtomicLong
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.Try
class ClickHouseSinkFunction[T](apiType: ApiType = ApiType.scala, config:
Properties)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkWriter.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkWriter.scala
index 590025895..f2b147901 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkWriter.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkWriter.scala
@@ -26,7 +26,7 @@ import org.asynchttpclient.{AsyncHttpClient,
DefaultAsyncHttpClientConfig, Dsl}
import java.util.concurrent._
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ListBuffer
case class ClickHouseSinkWriter(clickHouseConfig: ClickHouseHttpConfig)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
index a2de0d449..0b9ca1054 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
@@ -26,7 +26,7 @@ import org.asynchttpclient.{AsyncHttpClient,
ListenableFuture, Request, Response
import java.util.concurrent.{BlockingQueue, ExecutorService, TimeUnit}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.Try
case class ClickHouseWriterTask(
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
index a38f2cee3..40a97ff1e 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
@@ -20,7 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst
import java.util.Properties
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
object DorisConfig {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
index ef658778a..f0fd6b736 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
@@ -34,7 +34,7 @@ import org.elasticsearch.action.ActionRequest
import java.util.{Map => JavaMap, Properties}
import scala.annotation.meta.param
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
object ES5Sink {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
index 8c898089d..100c9b7b3 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
@@ -36,7 +36,7 @@ import org.elasticsearch.action.ActionRequest
import java.util.Properties
import scala.annotation.meta.param
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
object ES6Sink {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
index b644404bf..22557d63c 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
@@ -38,7 +38,7 @@ import org.elasticsearch.action.ActionRequest
import java.util.Properties
import scala.annotation.meta.param
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
object ES7Sink {
def apply(
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSinkFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSinkFunction.scala
index 43e633b61..072fc8afc 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSinkFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSinkFunction.scala
@@ -33,7 +33,7 @@ import java.util.Properties
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ArrayBuffer
class HBaseSinkFunction[T](apiType: ApiType = ApiType.scala, tabName: String,
prop: Properties)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
index 8d22ec817..5ea77453f 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
@@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.client.{Result, Table}
import java.lang
import java.util.Properties
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Success, Try}
class HBaseSourceFunction[R: TypeInformation](apiType: ApiType =
ApiType.scala, prop: Properties)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/request/HBaseRequest.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/request/HBaseRequest.scala
index 1d666b34d..d82bfe14b 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/request/HBaseRequest.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/request/HBaseRequest.scala
@@ -31,7 +31,7 @@ import java.util.concurrent.{CompletableFuture, Executors,
ExecutorService, Time
import java.util.function.{Consumer, Supplier}
import scala.annotation.meta.param
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
object HBaseRequest {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpSinkWriter.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpSinkWriter.scala
index 2b3f60203..a7b2f3998 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpSinkWriter.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpSinkWriter.scala
@@ -25,7 +25,7 @@ import org.asynchttpclient.{AsyncHttpClient, Dsl}
import java.util.concurrent._
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ListBuffer
case class HttpSinkWriter(thresholdConf: ThresholdConf, header: Map[String,
String])
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpWriterTask.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpWriterTask.scala
index 2fa31999e..2dda8cc05 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpWriterTask.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpWriterTask.scala
@@ -28,7 +28,7 @@ import org.asynchttpclient.{AsyncHttpClient,
ListenableFuture, Request, Response
import java.util
import java.util.concurrent.{BlockingQueue, ExecutorService, TimeUnit}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.Try
case class HttpWriterTask(
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/sink/HttpSink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/sink/HttpSink.scala
index 963d5c5a6..058bda317 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/sink/HttpSink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/sink/HttpSink.scala
@@ -29,7 +29,7 @@ import org.apache.http.client.methods._
import java.util.Properties
import scala.annotation.meta.param
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
object HttpSink {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
index 43772e45c..41e8e324e 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
@@ -31,7 +31,7 @@ import org.influxdb.dto.Point
import java.util.{Map => JavaMap, Properties}
import java.util.concurrent.TimeUnit
-import scala.collection.JavaConversions._;
+import scala.collection.convert.ImplicitConversions._;
class InfluxFunction[T](config: Properties)(implicit endpoint: InfluxEntity[T])
extends RichSinkFunction[T]
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
index b3cc4f710..e3274fcc7 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
@@ -33,9 +33,9 @@ import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont
import java.lang
import java.util.Properties
-import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.Map
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Success, Try}
class JdbcSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
jdbc: Properties)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
index 15073490c..05e8e7b8c 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
@@ -35,7 +35,7 @@ import java.util.Properties
import java.util.regex.Pattern
import scala.annotation.meta.param
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}
object KafkaSource {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
index 6e700992c..0e86fbae1 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
@@ -38,7 +38,7 @@ import org.bson.Document
import java.lang
import java.util.Properties
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Success, Try}
class MongoSourceFunction[R: TypeInformation](
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
index 530715101..072e42544 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
@@ -36,7 +36,7 @@ import java.util
import java.util.Properties
import scala.annotation.meta.param
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.Try
object RedisSink {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
index 717208cf0..fa761199a 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala
@@ -24,7 +24,7 @@ import com.github.benmanes.caffeine.cache.{Cache, Caffeine}
import java.util.concurrent.TimeUnit
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
/** Tracking info cache pool on flink kubernetes mode. */
class FlinkK8sWatchController extends Logger with AutoCloseable {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 59c82f471..dc23a0f2a 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -28,7 +28,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient
import java.io.File
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Success, Try}
object KubernetesDeploymentHelper extends Logger {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index 00242aff5..f00b1d42d 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -43,7 +43,7 @@ import java.io.File
import java.nio.charset.StandardCharsets
import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
import scala.concurrent.duration.DurationLong
import scala.language.{implicitConversions, postfixOps}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
index ce431e3a2..c9e9c94c5 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala
@@ -33,7 +33,7 @@ import org.junit.jupiter.api.Test
import java.io.File
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}
// scalastyle:off println
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
index 33f05fc29..c7353e3e1 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.packer.maven
import java.util.{List => JavaList}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
/**
* @param mavenArts
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 9c95748dd..2b07cbc74 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -44,7 +44,7 @@ import javax.annotation.{Nonnull, Nullable}
import java.io.File
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable.ArrayBuffer
import scala.util.Try
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 5d2eee224..2a8cda83f 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -33,7 +33,7 @@ import
org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import java.io.File
import scala.collection.{mutable, Map}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}
private[flink] object FlinkTableInitializer {
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkRunOption.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkRunOption.scala
index 305eaf0fb..dabc6d027 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkRunOption.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkRunOption.scala
@@ -20,7 +20,7 @@ import org.apache.commons.cli.{CommandLine, DefaultParser,
Option, Options}
import java.lang.{Boolean => JavaBoolean}
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.{Failure, Success, Try}
/** Applies to all optional parameters under flink run */
diff --git
a/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/HBaseOffset.scala
b/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/HBaseOffset.scala
index 4561ddfe8..fc95e7b82 100644
---
a/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/HBaseOffset.scala
+++
b/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/HBaseOffset.scala
@@ -28,7 +28,7 @@ import org.apache.spark.SparkConf
import java.util
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.collection.mutable
/**
diff --git
a/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/RedisOffset.scala
b/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/RedisOffset.scala
index b85f3ed16..7fb1c460f 100644
---
a/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/RedisOffset.scala
+++
b/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/RedisOffset.scala
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConf
import redis.clients.jedis.Protocol
-import scala.collection.JavaConversions._
+import scala.collection.convert.ImplicitConversions._
import scala.util.Try
/** Redis Offset Manager */