This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch yarn in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 6e1bf9005183b816e67985b1d837c3f6a8dbd0b6 Author: benjobs <[email protected]> AuthorDate: Wed Jun 14 14:46:13 2023 +0800 yarn resourceManager URL bug fix --- .../streampark/common/conf/FlinkVersion.scala | 2 +- .../common/conf/InternalConfigHolder.scala | 2 +- .../streampark/common/util/CURLBuilder.scala | 2 +- .../streampark/common/util/CommandUtils.scala | 2 +- .../streampark/common/util/ConfigUtils.scala | 2 +- .../apache/streampark/common/util/FileUtils.scala | 2 +- .../streampark/common/util/HBaseClient.scala | 2 +- .../streampark/common/util/HadoopConfigUtils.scala | 2 +- .../streampark/common/util/HadoopUtils.scala | 2 +- .../apache/streampark/common/util/HostsUtils.scala | 2 +- .../streampark/common/util/HttpClientUtils.scala | 2 +- .../apache/streampark/common/util/JdbcUtils.scala | 2 +- .../streampark/common/util/MongoConfig.scala | 2 +- .../streampark/common/util/PropertiesUtils.scala | 2 +- .../streampark/common/util/RedisClient.scala | 2 +- .../streampark/common/util/RedisEndpoint.scala | 2 +- .../apache/streampark/common/util/RedisUtils.scala | 2 +- .../streampark/common/util/SqlConvertUtils.scala | 2 +- .../org/apache/streampark/common/util/Utils.scala | 2 +- .../apache/streampark/common/util/YarnUtils.scala | 26 ++++++++++------------ .../streampark/common/util/ZooKeeperUtils.scala | 2 +- .../console/core/bean/AlertTemplate.java | 2 +- .../console/core/entity/FlinkCluster.java | 2 +- .../core/service/impl/FlinkClusterServiceImpl.java | 3 ++- .../core/service/alert/AlertServiceTest.java | 2 +- .../flink/client/bean/SubmitRequest.scala | 2 +- .../impl/KubernetesNativeSessionClient.scala | 2 +- .../flink/client/impl/YarnPerJobClient.scala | 2 +- .../flink/client/impl/YarnSessionClient.scala | 2 +- .../flink/client/trait/FlinkClientTrait.scala | 2 +- .../flink/client/test/YarnPerJobTestCase.scala | 2 +- .../flink/connector/conf/ThresholdConf.scala | 2 +- .../flink/connector/failover/FailoverChecker.scala | 2 +- .../flink/connector/failover/FailoverWriter.scala | 2 +- .../flink/connector/failover/SinkBuffer.scala | 2 +- .../flink/connector/failover/SinkRequest.scala | 2 +- .../clickhouse/conf/ClickHouseHttpConfig.scala | 2 +- .../internal/ClickHouseSinkFunction.scala | 2 +- .../clickhouse/internal/ClickHouseSinkWriter.scala | 2 +- .../clickhouse/internal/ClickHouseWriterTask.scala | 2 +- .../connector/doris/conf/DorisConfig.scala | 2 +- .../connector/elasticsearch5/sink/ES5Sink.scala | 2 +- .../connector/elasticsearch6/sink/ES6Sink.scala | 2 +- .../connector/elasticsearch7/sink/ES7Sink.scala | 2 +- .../hbase/internal/HBaseSinkFunction.scala | 2 +- .../hbase/internal/HBaseSourceFunction.scala | 2 +- .../connector/hbase/request/HBaseRequest.scala | 2 +- .../connector/http/internal/HttpSinkWriter.scala | 2 +- .../connector/http/internal/HttpWriterTask.scala | 2 +- .../flink/connector/http/sink/HttpSink.scala | 2 +- .../connector/influx/function/InfluxFunction.scala | 2 +- .../jdbc/internal/JdbcSourceFunction.scala | 2 +- .../flink/connector/kafka/source/KafkaSource.scala | 2 +- .../mongo/internal/MongoSourceFunction.scala | 2 +- .../flink/connector/redis/sink/RedisSink.scala | 2 +- .../flink/kubernetes/FlinkK8sWatchController.scala | 2 +- .../helper/KubernetesDeploymentHelper.scala | 2 +- .../kubernetes/watcher/FlinkJobStatusWatcher.scala | 2 +- .../flink/kubernetes/FlinkRestJsonTest.scala | 2 +- .../flink/packer/maven/DependencyInfo.scala | 2 +- .../streampark/flink/packer/maven/MavenTool.scala | 2 +- .../flink/core/FlinkTableInitializer.scala | 2 +- .../flink/core/conf/FlinkRunOption.scala | 2 +- .../spark/connector/kafka/offset/HBaseOffset.scala | 2 +- .../spark/connector/kafka/offset/RedisOffset.scala | 2 +- 65 files changed, 77 insertions(+), 78 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala index 5c23b4772..6585f7e56 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import java.util.function.Consumer import java.util.regex.Pattern -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable /** @param flinkHome actual flink home that must be a readable local path */ diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala index e06a900a3..73c23a4db 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala @@ -25,7 +25,7 @@ import javax.annotation.{Nonnull, Nullable} import java.util import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.language.postfixOps /** diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala index ef8bde57d..da4d7fd53 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/CURLBuilder.scala @@ -18,7 +18,7 @@ package org.apache.streampark.common.util import java.util -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ class CURLBuilder(val url: String) { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/CommandUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/CommandUtils.scala index f0a133430..aded4397d 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/CommandUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/CommandUtils.scala @@ -21,7 +21,7 @@ import java.lang.{Iterable => JavaIterable} import java.util.Scanner import java.util.function.Consumer -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Failure, Success, Try} object CommandUtils extends Logger { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala index 8e6dcf671..e0c159299 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala @@ -20,7 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst._ import java.util.{Map => JavaMap, Properties} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.immutable.{Map => ScalaMap} import scala.util.Try diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala index 708943098..b2be8dca2 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala @@ -21,7 +21,7 @@ import java.net.URL import java.util import java.util.Scanner -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable object FileUtils { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala index e2b36fecc..f128f252b 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.security.UserGroupInformation import java.util.Properties -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ class HBaseClient(func: () => Connection) extends Serializable { lazy val connection: Connection = func() diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala index 36936e54f..7fa179177 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala @@ -24,8 +24,8 @@ import java.io.File import java.nio.charset.StandardCharsets import java.util.{Collections, Map => JavaMap, Optional} -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.immutable.ListMap import scala.util.{Failure, Success, Try} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala index aca0f75a8..fe104c295 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala @@ -39,7 +39,7 @@ import java.util import java.util.{Timer, TimerTask} import java.util.concurrent._ -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Failure, Success, Try} object HadoopUtils extends Logger { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HostsUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HostsUtils.scala index 871165c07..65f2ff909 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HostsUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HostsUtils.scala @@ -22,7 +22,7 @@ import io.netty.resolver.HostsFileParser import java.net.InetAddress import java.util.{Map => JavaMap} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.immutable.ListMap object HostsUtils { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala index 34247a510..a2fa6f644 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HttpClientUtils.scala @@ -33,7 +33,7 @@ import java.nio.charset.{Charset, StandardCharsets} import java.security.Principal import java.util.{List => JavaList, Map => JavaMap} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ object HttpClientUtils { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala index 2186b8e8d..6029d88e9 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala @@ -25,7 +25,7 @@ import java.util.Properties import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.locks.ReentrantLock -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.Try diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala index bfbdcbb2d..e04c97d13 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala @@ -22,7 +22,7 @@ import com.mongodb._ import java.util.Properties -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ object MongoConfig { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala index 75ab3ceff..87440079b 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala @@ -27,8 +27,8 @@ import java.util.{HashMap => JavaMap, LinkedHashMap => JavaLinkedMap, Properties import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable import scala.collection.mutable.{Map => MutableMap} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala index 89dbd6063..2b2a4420f 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala @@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.annotation.meta.getter import scala.annotation.tailrec -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.Random object RedisClient extends Logger { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala index b9758beaf..ab765e691 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala @@ -25,7 +25,7 @@ import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, SafeEncoder} import java.net.URI import java.util.Properties -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ /** * RedisEndpoint represents a redis connection endpoint info: host, port, auth password db number, diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisUtils.scala index 477f3cffb..b96f06b6d 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisUtils.scala @@ -21,8 +21,8 @@ import redis.clients.jedis.{Jedis, JedisCluster, Pipeline, ScanParams} import java.lang.{Integer => JInt} import java.util.Set -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.immutable import scala.util.{Failure, Success, Try} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala index 0c315ac98..ba38150c2 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala @@ -21,7 +21,7 @@ import java.util.Scanner import java.util.regex.Pattern import scala.annotation.tailrec -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ object SqlConvertUtils extends Logger { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index 8b6ce53ab..98ec6ce6c 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -24,7 +24,7 @@ import java.net.URL import java.util.{jar, Collection => JavaCollection, Map => JavaMap, Properties, UUID} import java.util.jar.{JarFile, JarInputStream} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Failure, Success, Try} object Utils { diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala index 2ffc244dc..cdf973452 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala @@ -33,7 +33,7 @@ import java.util import java.util.{HashMap => JavaHashMap, List => JavaList} import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Success, Try} import scala.util.control.Breaks.{break, breakable} @@ -120,15 +120,8 @@ object YarnUtils extends Logger { if (StringUtils.isNotBlank(PROXY_YARN_URL)) PROXY_YARN_URL else getRMWebAppURL() } - /** - * <pre> - * - * @return - * </pre> - */ - def getRMWebAppURL(): String = { - - if (rmHttpURL == null) { + def getRMWebAppURL(getLatest: Boolean = false): String = { + if (rmHttpURL == null || getLatest) { synchronized { val conf = HadoopUtils.hadoopConf val useHttps = YarnConfiguration.useHttps(conf) @@ -249,7 +242,6 @@ object YarnUtils extends Logger { * @return */ def restRequest(url: String): String = { - if (url == null) return null def request(reqUrl: String): String = { logDebug("request url is " + reqUrl) @@ -280,10 +272,16 @@ object YarnUtils extends Logger { } } - if (url.startsWith("http://") || url.startsWith("https://")) request(url) - else { - request(s"${getRMWebAppURL()}/$url") + url match { + case u if u.matches("^http(|s)://.*") => request(url) + case _ => + val resp = request(s"${getRMWebAppURL()}/$url") + if (resp != null) resp; + else { + request(s"${getRMWebAppURL(true)}/$url") + } } + } } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala index b8cd19c28..1d3fc5868 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ZooKeeperUtils.scala @@ -23,7 +23,7 @@ import org.apache.zookeeper.CreateMode import java.nio.charset.StandardCharsets -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable object ZooKeeperUtils { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java index 774dd6a3e..fa6487707 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/bean/AlertTemplate.java @@ -60,7 +60,7 @@ public class AlertTemplate implements Serializable { if (ExecutionMode.isYarnMode(application.getExecutionMode())) { String format = "%s/proxy/%s/"; - String url = String.format(format, YarnUtils.getRMWebAppURL(), application.getAppId()); + String url = String.format(format, YarnUtils.getRMWebAppURL(false), application.getAppId()); template.setLink(url); } else { template.setLink(null); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index 62cedea94..7bfa52e72 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -178,7 +178,7 @@ public class FlinkCluster implements Serializable { } if (ExecutionMode.YARN_SESSION.equals(this.getExecutionModeEnum())) { try { - String restUrl = YarnUtils.getRMWebAppURL() + "/proxy/" + this.clusterId + "/overview"; + String restUrl = YarnUtils.getRMWebAppURL(true) + "/proxy/" + this.clusterId + "/overview"; String result = HttpClientUtils.httpGetRequest( restUrl, diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java index 80e527494..cb7e4ce76 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java @@ -163,7 +163,8 @@ public class FlinkClusterServiceImpl extends ServiceImpl<FlinkClusterMapper, Fli deployResponse, "Deploy cluster failed, unknown reason,please check you params or StreamPark error log"); if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) { - String address = YarnUtils.getRMWebAppURL() + "/proxy/" + deployResponse.clusterId() + "/"; + String address = + YarnUtils.getRMWebAppURL(true) + "/proxy/" + deployResponse.clusterId() + "/"; flinkCluster.setAddress(address); flinkCluster.setJobManagerUrl(deployResponse.address()); } else { diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java index d85ef7cbc..233bdb242 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/alert/AlertServiceTest.java @@ -205,7 +205,7 @@ class AlertServiceTest { duration = application.getEndTime().getTime() - application.getStartTime().getTime(); } String format = "%s/proxy/%s/"; - String url = String.format(format, YarnUtils.getRMWebAppURL(), application.getAppId()); + String url = String.format(format, YarnUtils.getRMWebAppURL(false), application.getAppId()); AlertTemplate template = new AlertTemplate(); template.setJobName(application.getJobName()); diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 2464e26f1..8f29a4cdf 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -33,7 +33,7 @@ import javax.annotation.Nullable import java.io.File import java.util.{Map => JavaMap} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.Try /** diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala index 346a8f1c6..d609b5e5b 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala @@ -38,7 +38,7 @@ import org.apache.flink.kubernetes.kubeclient.{FlinkKubeClient, FlinkKubeClientF import java.io.File -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.language.postfixOps import scala.util.{Failure, Success, Try} diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala index 79813adf2..1c2bb0394 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId import java.io.File import java.lang.{Boolean => JavaBool} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ /** yarn PerJob mode submit */ object YarnPerJobClient extends YarnClientTrait { diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index bbcd0cdaf..6ad07198b 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -35,8 +35,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils import java.util -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ListBuffer /** Submit Job to YARN Session Cluster */ diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 64c67098e..670d323da 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -43,8 +43,8 @@ import java.io.File import java.util.{Collections, List => JavaList, Map => JavaMap} import scala.annotation.tailrec -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.language.postfixOps diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala index 6451d5908..0f8c11c32 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/test/scala/org/apache/streampark/flink/client/test/YarnPerJobTestCase.scala @@ -40,8 +40,8 @@ import java.io.File import java.lang.reflect.Method import java.util -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions._ /** perJob to submit jobs programmatically, */ object YarnPerJobTestCase extends Logger { diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/conf/ThresholdConf.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/conf/ThresholdConf.scala index 529fc9d20..dc3300f5c 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/conf/ThresholdConf.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/conf/ThresholdConf.scala @@ -22,8 +22,8 @@ import org.apache.streampark.flink.connector.conf.FailoverStorageType.{Console, import java.util.Properties -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.convert.ImplicitConversions._ case class ThresholdConf(prefixStr: String, parameters: Properties) { diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverChecker.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverChecker.scala index 11a30f5f7..7a05b99c2 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverChecker.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverChecker.scala @@ -21,7 +21,7 @@ import org.apache.streampark.common.util.{Logger, ThreadUtils} import java.util.concurrent.{Executors, ScheduledExecutorService, ThreadFactory, TimeUnit} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ListBuffer case class FailoverChecker(delayTime: Long) extends AutoCloseable with Logger { diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala index 515583929..1ef414dde 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/FailoverWriter.scala @@ -26,7 +26,7 @@ import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecor import java.util._ import java.util.concurrent.locks.ReentrantLock -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ class FailoverWriter(failoverStorage: FailoverStorageType, properties: Properties) extends AutoCloseable diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkBuffer.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkBuffer.scala index 531f5c824..8cb9899c0 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkBuffer.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkBuffer.scala @@ -23,7 +23,7 @@ import java.util import java.util.Collections import java.util.concurrent.CopyOnWriteArrayList -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ case class SinkBuffer(writer: SinkWriter, flushInterval: Long, bufferSize: Int) extends AutoCloseable diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala index c1b8b7a7f..37f94b013 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala @@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.failover import java.util import java.util.regex.Pattern -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ case class SinkRequest(records: util.List[String], var attemptCounter: Int = 0) { def incrementCounter(): Unit = attemptCounter += 1 diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala index 41d8480ce..c75eb1790 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala @@ -23,7 +23,7 @@ import org.apache.streampark.flink.connector.conf.ThresholdConf import java.util.{Base64, Properties} import java.util.concurrent.ThreadLocalRandom -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ /** * Flink sink for Clickhouse database. Powered by Async Http Client. diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala index 038f2dff6..707b14737 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkFunction.scala @@ -34,7 +34,7 @@ import java.util import java.util.Properties import java.util.concurrent.atomic.AtomicLong -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.Try class ClickHouseSinkFunction[T](apiType: ApiType = ApiType.scala, config: Properties) diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkWriter.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkWriter.scala index 590025895..f2b147901 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkWriter.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseSinkWriter.scala @@ -26,7 +26,7 @@ import org.asynchttpclient.{AsyncHttpClient, DefaultAsyncHttpClientConfig, Dsl} import java.util.concurrent._ -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ListBuffer case class ClickHouseSinkWriter(clickHouseConfig: ClickHouseHttpConfig) diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala index a2de0d449..0b9ca1054 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala @@ -26,7 +26,7 @@ import org.asynchttpclient.{AsyncHttpClient, ListenableFuture, Request, Response import java.util.concurrent.{BlockingQueue, ExecutorService, TimeUnit} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.Try case class ClickHouseWriterTask( diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala index a38f2cee3..40a97ff1e 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala @@ -20,7 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst import java.util.Properties -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ object DorisConfig { diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala index ef658778a..f0fd6b736 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala @@ -34,7 +34,7 @@ import org.elasticsearch.action.ActionRequest import java.util.{Map => JavaMap, Properties} import scala.annotation.meta.param -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ object ES5Sink { diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala index 8c898089d..100c9b7b3 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala @@ -36,7 +36,7 @@ import org.elasticsearch.action.ActionRequest import java.util.Properties import scala.annotation.meta.param -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ object ES6Sink { diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala index b644404bf..22557d63c 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala @@ -38,7 +38,7 @@ import org.elasticsearch.action.ActionRequest import java.util.Properties import scala.annotation.meta.param -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ object ES7Sink { def apply( diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSinkFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSinkFunction.scala index 43e633b61..072fc8afc 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSinkFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSinkFunction.scala @@ -33,7 +33,7 @@ import java.util.Properties import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ArrayBuffer class HBaseSinkFunction[T](apiType: ApiType = ApiType.scala, tabName: String, prop: Properties) diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala index 8d22ec817..5ea77453f 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.client.{Result, Table} import java.lang import java.util.Properties -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Success, Try} class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala, prop: Properties) diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/request/HBaseRequest.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/request/HBaseRequest.scala index 1d666b34d..d82bfe14b 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/request/HBaseRequest.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/request/HBaseRequest.scala @@ -31,7 +31,7 @@ import java.util.concurrent.{CompletableFuture, Executors, ExecutorService, Time import java.util.function.{Consumer, Supplier} import scala.annotation.meta.param -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ object HBaseRequest { diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpSinkWriter.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpSinkWriter.scala index 2b3f60203..a7b2f3998 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpSinkWriter.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpSinkWriter.scala @@ -25,7 +25,7 @@ import org.asynchttpclient.{AsyncHttpClient, Dsl} import java.util.concurrent._ -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ListBuffer case class HttpSinkWriter(thresholdConf: ThresholdConf, header: Map[String, String]) diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpWriterTask.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpWriterTask.scala index 2fa31999e..2dda8cc05 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpWriterTask.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/internal/HttpWriterTask.scala @@ -28,7 +28,7 @@ import org.asynchttpclient.{AsyncHttpClient, ListenableFuture, Request, Response import java.util import java.util.concurrent.{BlockingQueue, ExecutorService, TimeUnit} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.Try case class HttpWriterTask( diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/sink/HttpSink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/sink/HttpSink.scala index 963d5c5a6..058bda317 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/sink/HttpSink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/sink/HttpSink.scala @@ -29,7 +29,7 @@ import org.apache.http.client.methods._ import java.util.Properties import scala.annotation.meta.param -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ object HttpSink { diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala index 43772e45c..41e8e324e 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala @@ -31,7 +31,7 @@ import org.influxdb.dto.Point import java.util.{Map => JavaMap, Properties} import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions._; +import scala.collection.convert.ImplicitConversions._; class InfluxFunction[T](config: Properties)(implicit endpoint: InfluxEntity[T]) extends RichSinkFunction[T] diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala index b3cc4f710..e3274fcc7 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala @@ -33,9 +33,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceCont import java.lang import java.util.Properties -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.Map +import scala.collection.convert.ImplicitConversions._ import scala.util.{Success, Try} class JdbcSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala, jdbc: Properties) diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala index 15073490c..05e8e7b8c 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala @@ -35,7 +35,7 @@ import java.util.Properties import java.util.regex.Pattern import scala.annotation.meta.param -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Failure, Success, Try} object KafkaSource { diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala index 6e700992c..0e86fbae1 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala @@ -38,7 +38,7 @@ import org.bson.Document import java.lang import java.util.Properties -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Success, Try} class MongoSourceFunction[R: TypeInformation]( diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala index 530715101..072e42544 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala @@ -36,7 +36,7 @@ import java.util import java.util.Properties import scala.annotation.meta.param -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.Try object RedisSink { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala index 717208cf0..fa761199a 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala @@ -24,7 +24,7 @@ import com.github.benmanes.caffeine.cache.{Cache, Caffeine} import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ /** Tracking info cache pool on flink kubernetes mode. */ class FlinkK8sWatchController extends Logger with AutoCloseable { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala index 59c82f471..dc23a0f2a 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala @@ -28,7 +28,7 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient import java.io.File -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Success, Try} object KubernetesDeploymentHelper extends Logger { diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala index 00242aff5..f00b1d42d 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala @@ -43,7 +43,7 @@ import java.io.File import java.nio.charset.StandardCharsets import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future} import scala.concurrent.duration.DurationLong import scala.language.{implicitConversions, postfixOps} diff --git a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala index ce431e3a2..c9e9c94c5 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/test/scala/org/apache/streampark/flink/kubernetes/FlinkRestJsonTest.scala @@ -33,7 +33,7 @@ import org.junit.jupiter.api.Test import java.io.File -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Failure, Success, Try} // scalastyle:off println diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala index 33f05fc29..c7353e3e1 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/DependencyInfo.scala @@ -19,7 +19,7 @@ package org.apache.streampark.flink.packer.maven import java.util.{List => JavaList} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ /** * @param mavenArts diff --git a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala index 9c95748dd..2b07cbc74 100644 --- a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala +++ b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala @@ -44,7 +44,7 @@ import javax.annotation.{Nonnull, Nullable} import java.io.File import java.util -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable.ArrayBuffer import scala.util.Try diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index 5d2eee224..2a8cda83f 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -33,7 +33,7 @@ import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment import java.io.File import scala.collection.{mutable, Map} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Failure, Success, Try} private[flink] object FlinkTableInitializer { diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkRunOption.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkRunOption.scala index 305eaf0fb..dabc6d027 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkRunOption.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkRunOption.scala @@ -20,7 +20,7 @@ import org.apache.commons.cli.{CommandLine, DefaultParser, Option, Options} import java.lang.{Boolean => JavaBoolean} -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.{Failure, Success, Try} /** Applies to all optional parameters under flink run */ diff --git a/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/HBaseOffset.scala b/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/HBaseOffset.scala index 4561ddfe8..fc95e7b82 100644 --- a/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/HBaseOffset.scala +++ b/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/HBaseOffset.scala @@ -28,7 +28,7 @@ import org.apache.spark.SparkConf import java.util -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable /** diff --git a/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/RedisOffset.scala b/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/RedisOffset.scala index b85f3ed16..7fb1c460f 100644 --- a/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/RedisOffset.scala +++ b/streampark-spark/streampark-spark-connector/streampark-spark-connector-kafka/src/main/scala/org/apache/streampark/spark/connector/kafka/offset/RedisOffset.scala @@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.SparkConf import redis.clients.jedis.Protocol -import scala.collection.JavaConversions._ +import scala.collection.convert.ImplicitConversions._ import scala.util.Try /** Redis Offset Manager */
