This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dependency
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dependency by this push:
     new e03dcd825 minor improvement
e03dcd825 is described below

commit e03dcd825299a20794392d3344ef09adb6551a62
Author: benjobs <[email protected]>
AuthorDate: Fri Aug 4 17:33:40 2023 +0800

    minor improvement
---
 .../apache/streampark/common/zio/ZIOLogger.scala   | 12 +++++------
 .../console/core/runner/EnvInitializer.java        |  2 +-
 .../v2/{Config.scala => FlinkK8sConfig.scala}      |  2 +-
 .../kubernetes/v2/FlinkMemorySizeParser.scala      | 24 ++++++++++------------
 .../flink/kubernetes/v2/FlinkRestRequest.scala     |  8 ++++----
 .../v2/{httpfs => fs}/EmbeddedFileServer.scala     |  2 +-
 .../kubernetes/v2/{httpfs => fs}/FileMirror.scala  |  2 +-
 .../v2/{httpfs => fs}/FileServerPeerAddress.scala  |  2 +-
 .../kubernetes/v2/{httpfs => fs}/package.scala     |  4 ++--
 .../kubernetes/v2/observer/FlinkK8sObserver.scala  | 16 +++++++--------
 .../v2/observer/RawClusterObserver.scala           |  6 ++----
 .../v2/observer/RestSvcEndpointObserver.scala      |  1 +
 .../flink/kubernetes/v2/observer/package.scala     |  2 +-
 .../flink/kubernetes/v2/operator/CROperator.scala  | 13 ++++++++----
 .../kubernetes/v2/operator/FlinkK8sOperator.scala  |  2 +-
 .../v2/operator/{OprErr.scala => OprError.scala}   |  2 +-
 .../flink/kubernetes/v2/operator/package.scala     |  2 +-
 .../v2/example/UsingEmbeddedFileServer.scala       |  2 +-
 .../kubernetes/v2/example/UsingOperator.scala      |  2 +-
 .../flink/kubernetes/v2/example/package.scala      |  2 +-
 20 files changed, 55 insertions(+), 53 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOLogger.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOLogger.scala
index 29a6232f6..616c0c0fb 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOLogger.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOLogger.scala
@@ -24,7 +24,7 @@ import zio.logging.LoggerNameExtractor
 
 import scala.collection.concurrent.TrieMap
 
-/** ZIO logging Backend that bridging to 
[[org.apache.streampark.common.util.Logger]] */
+/** ZIOLogger that bridging to [[org.apache.streampark.common.util.Logger]] */
 object ZIOLogger {
 
   lazy val default: ZLayer[Any, Nothing, Unit] = 
Runtime.addLogger(provideLogger())
@@ -48,12 +48,12 @@ object ZIOLogger {
 
   private[this] def provideLogger(): ZLogger[String, Unit] = (
       trace: Trace,
-      fiberId: FiberId,
+      _: FiberId,
       logLevel: LogLevel,
       message: () => String,
-      cause: Cause[Any],
-      context: FiberRefs,
-      spans: List[LogSpan],
+      _: Cause[Any],
+      _: FiberRefs,
+      _: List[LogSpan],
       annotations: Map[String, String]) => {
 
     val loggerName =
@@ -73,7 +73,7 @@ object ZIOLogger {
       case LogLevel.Warning => logger.warn(msg)
       case LogLevel.Error => logger.error(msg)
       case LogLevel.Fatal => logger.error(msg)
-      case _ => logger.error(msg)
+      case _ =>
     }
   }
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 011218f90..c6cc24ed8 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -30,7 +30,7 @@ import org.apache.streampark.common.zio.ZIOExt;
 import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.entity.FlinkEnv;
 import org.apache.streampark.console.core.service.SettingService;
-import org.apache.streampark.flink.kubernetes.v2.httpfs.EmbeddedFileServer;
+import org.apache.streampark.flink.kubernetes.v2.fs.EmbeddedFileServer;
 
 import org.apache.commons.lang3.StringUtils;
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/Config.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkK8sConfig.scala
similarity index 99%
rename from 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/Config.scala
rename to 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkK8sConfig.scala
index 945dcd8cf..1defe9565 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/Config.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkK8sConfig.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.kubernetes.v2
 
 import org.apache.streampark.common.conf.{InternalOption, Workspace}
 
-object Config {
+object FlinkK8sConfig {
 
   // ----- embedded http file server config -----
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkMemorySizeParser.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkMemorySizeParser.scala
index 758c43eb0..bc0099102 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkMemorySizeParser.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkMemorySizeParser.scala
@@ -24,15 +24,12 @@ object FlinkMemorySizeParser {
   private val pattern = raw"(\d+)\s*([a-zA-Z]+)".r
 
   def parse(text: String): Option[MemorySize] = Try {
-    val trimmed = text.trim
-    if (trimmed.isEmpty) return None
-
-    pattern.findFirstMatchIn(text) match {
+    pattern.findFirstMatchIn(text.trim) match {
       case None          => None
       case Some(matched) =>
         val size = matched.group(1).toLong
         val unit = matched.group(2)
-        Unit.all.find(u => u.units.contains(unit)) match {
+        Unit.all.find(_.units.contains(unit.toLowerCase)) match {
           case None          => None
           case Some(hitUnit) => Some(MemorySize(size * hitUnit.multiplier))
         }
@@ -47,14 +44,15 @@ object FlinkMemorySizeParser {
     def tebiBytes: Long = bytes >> 40
   }
 
-  sealed abstract class UnitADT(val units: Array[String], val multiplier: Long)
-  object Unit {
-    val all = Array(Bytes, KiloBytes, MegaBytes, GigaBytes, TeraBytes)
-    case object Bytes     extends UnitADT(Array("b", "bytes"), 1L)
-    case object KiloBytes extends UnitADT(Array("k", "kb", "kibibytes"), 1024L)
-    case object MegaBytes extends UnitADT(Array("m", "mb", "mebibytes"), 1024L 
* 1024L)
-    case object GigaBytes extends UnitADT(Array("g", "gb", "gibibytes"), 1024L 
* 1024L * 1024L)
-    case object TeraBytes extends UnitADT(Array("t", "tb", "tebibytes"), 1024L 
* 1024L * 1024L * 1024L)
+  private[this] object Unit {
+    sealed abstract class Unit(val units: Array[String], val multiplier: Long)
+
+    lazy val all: Array[Unit] = Array(Bytes, KiloBytes, MegaBytes, GigaBytes, 
TeraBytes)
+    case object Bytes     extends Unit(Array("b", "bytes"), 1L)
+    case object KiloBytes extends Unit(Array("k", "kb", "kibibytes"), 1024L)
+    case object MegaBytes extends Unit(Array("m", "mb", "mebibytes"), 1024L * 
1024L)
+    case object GigaBytes extends Unit(Array("g", "gb", "gibibytes"), 1024L * 
1024L * 1024L)
+    case object TeraBytes extends Unit(Array("t", "tb", "tebibytes"), 1024L * 
1024L * 1024L * 1024L)
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala
index c96e88d27..9980ed512 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala
@@ -43,7 +43,7 @@ case class FlinkRestRequest(restUrl: String) {
   def listJobOverviewInfo: IO[Throwable, Vector[JobOverviewInfo]] =
     for {
       res <- Client.request(s"$restUrl/jobs/overview")
-      rs  <- res.body.asJson[JobOverviewRsp]
+      rs  <- res.body.asJson[JobOverviewResp]
     } yield rs.jobs
 
   /**
@@ -147,10 +147,10 @@ object FlinkRestRequest {
 
   // --- Flink rest api models ---
 
-  case class JobOverviewRsp(jobs: Vector[JobOverviewInfo])
+  case class JobOverviewResp(jobs: Vector[JobOverviewInfo])
 
-  object JobOverviewRsp {
-    implicit val codec: JsonCodec[JobOverviewRsp] = 
DeriveJsonCodec.gen[JobOverviewRsp]
+  object JobOverviewResp {
+    implicit val codec: JsonCodec[JobOverviewResp] = 
DeriveJsonCodec.gen[JobOverviewResp]
   }
 
   case class JobOverviewInfo(
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/EmbeddedFileServer.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/EmbeddedFileServer.scala
similarity index 96%
rename from 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/EmbeddedFileServer.scala
rename to 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/EmbeddedFileServer.scala
index ff16bc607..05d4badec 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/EmbeddedFileServer.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/EmbeddedFileServer.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.kubernetes.v2.httpfs
+package org.apache.streampark.flink.kubernetes.v2.fs
 
 import org.apache.streampark.common.zio.ZIOExt.UIOOps
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileMirror.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileMirror.scala
similarity index 97%
rename from 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileMirror.scala
rename to 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileMirror.scala
index 79df265ef..a3cdc4a37 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileMirror.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileMirror.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.kubernetes.v2.httpfs
+package org.apache.streampark.flink.kubernetes.v2.fs
 
 import zio.{IO, UIO, ZIO}
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileServerPeerAddress.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileServerPeerAddress.scala
similarity index 98%
rename from 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileServerPeerAddress.scala
rename to 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileServerPeerAddress.scala
index 3658f41b4..477a7a9bb 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileServerPeerAddress.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileServerPeerAddress.scala
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.flink.kubernetes.v2.httpfs
+package org.apache.streampark.flink.kubernetes.v2.fs
 
 import org.apache.streampark.common.zio.ZIOExt.{unsafeRun, UIOOps}
 import org.apache.streampark.flink.kubernetes.v2.K8sTools.{newK8sClient, 
usingK8sClient}
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/package.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/package.scala
similarity index 87%
rename from 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/package.scala
rename to 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/package.scala
index e5382cd5e..21480e6a6 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/package.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/package.scala
@@ -18,9 +18,9 @@
 package org.apache.streampark.flink.kubernetes.v2
 
 import org.apache.streampark.common.conf.InternalConfigHolder
-import 
org.apache.streampark.flink.kubernetes.v2.Config.{EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR,
 EMBEDDED_HTTP_FILE_SERVER_PORT}
+import 
org.apache.streampark.flink.kubernetes.v2.FlinkK8sConfig.{EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR,
 EMBEDDED_HTTP_FILE_SERVER_PORT}
 
-package object httpfs {
+package object fs {
 
   lazy val localMirrorDir: String = 
InternalConfigHolder.get(EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR)
   lazy val fileServerPort: Int    = 
InternalConfigHolder.get(EMBEDDED_HTTP_FILE_SERVER_PORT)
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
index a355be3d3..ee23bd49d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
@@ -131,10 +131,10 @@ object FlinkK8sObserver extends FlinkK8sObserver {
 
     for {
       _ <- key match {
-             case ApplicationJobKey(id, ns, name)                       => 
trackCluster(ns, name)
-             case SessionJobKey(id, ns, name, clusterName)              => 
trackSessionJob(ns, name, clusterName)
-             case UnmanagedSessionJobKey(id, clusterNs, clusterId, jid) => 
trackCluster(clusterNs, clusterId)
-             case ClusterKey(id, ns, name)                              => 
trackCluster(ns, name)
+             case ApplicationJobKey(_, ns, name)                     => 
trackCluster(ns, name)
+             case SessionJobKey(_, ns, name, clusterName)            => 
trackSessionJob(ns, name, clusterName)
+             case UnmanagedSessionJobKey(_, clusterNs, clusterId, _) => 
trackCluster(clusterNs, clusterId)
+             case ClusterKey(_, ns, name)                            => 
trackCluster(ns, name)
            }
       _ <- trackedKeys.add(key).unit
       _ <- logInfo(s"Start watching Flink resource: $key")
@@ -183,10 +183,10 @@ object FlinkK8sObserver extends FlinkK8sObserver {
 
     for {
       _ <- key match {
-             case ApplicationJobKey(id, ns, name)                         => 
unTrackCluster(ns, name)
-             case SessionJobKey(id, ns, name, clusterName)                => 
unTrackSessionJob(ns, name)
-             case ClusterKey(id, ns, name)                                => 
unTrackPureCluster(ns, name)
-             case UnmanagedSessionJobKey(id, clusterNs, clusterName, jid) =>
+             case ApplicationJobKey(_, ns, name)                       => 
unTrackCluster(ns, name)
+             case SessionJobKey(_, ns, name, _)                        => 
unTrackSessionJob(ns, name)
+             case ClusterKey(_, ns, name)                              => 
unTrackPureCluster(ns, name)
+             case UnmanagedSessionJobKey(_, clusterNs, clusterName, _) =>
                unTrackUnmanagedSessionJob(clusterNs, clusterName)
            }
       _ <- trackedKeys.remove(key)
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
index d4a7e12b7..5fc545483 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
@@ -98,14 +98,12 @@ case class RawClusterObserver(
       .map { case (clusterOv, jmConfigs) =>
         val totalJmMemory = FlinkMemorySizeParser
           .parse(jmConfigs.getOrElse("jobmanager.memory.process.size", "0b"))
-          .map(_.mebiBytes)
-          .map(e => Try(e.toInt).getOrElse(0))
+          .map(x => Try(x.mebiBytes.toInt).getOrElse(0))
           .getOrElse(0)
 
         val totalTmMemory = FlinkMemorySizeParser
           .parse(jmConfigs.getOrElse("taskmanager.memory.process.size", "0b"))
-          .map(_.mebiBytes * clusterOv.taskManagers)
-          .map(e => Try(e.toInt).getOrElse(0))
+          .map(x => Try(x.mebiBytes * clusterOv.taskManagers).getOrElse(0))
           .getOrElse(0)
 
         ClusterMetrics(
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RestSvcEndpointObserver.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RestSvcEndpointObserver.scala
index ff97e2c5a..b38d605a3 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RestSvcEndpointObserver.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RestSvcEndpointObserver.scala
@@ -73,6 +73,7 @@ case class RestSvcEndpointObserver(restSvcEndpointSnaps: 
ConcurrentMap[(Namespac
               .find(_.getPort == 8081)
               .map(_.getTargetPort.getIntVal.toInt)
               .getOrElse(8081)
+
             Some(RestSvcEndpoint(namespace, name, port, clusterIP))
         }
         .mapZIO {
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/package.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/package.scala
index 0f1e6dca0..97e8ecaab 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/package.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/package.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.kubernetes.v2
 
 import org.apache.streampark.common.conf.InternalConfigHolder
-import org.apache.streampark.flink.kubernetes.v2.Config._
+import org.apache.streampark.flink.kubernetes.v2.FlinkK8sConfig._
 import 
org.apache.streampark.flink.kubernetes.v2.observer.AccessFlinkRestType.AccessFlinkRestType
 
 import zio.{durationLong, Duration}
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
index 6cefcf517..e87da3da6 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.kubernetes.v2.operator
 
 import org.apache.streampark.flink.kubernetes.v2.{pathLastSegment, yamlMapper}
 import org.apache.streampark.flink.kubernetes.v2.K8sTools.usingK8sClient
-import org.apache.streampark.flink.kubernetes.v2.httpfs.FileMirror
+import org.apache.streampark.flink.kubernetes.v2.fs.FileMirror
 import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef, 
FlinkSessionJobDef, JobDef}
 import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
 
@@ -30,6 +30,7 @@ import zio.stream.ZStream
 
 import java.util
 
+import scala.collection.convert.ImplicitConversions._
 import scala.jdk.CollectionConverters._
 
 /**
@@ -157,7 +158,8 @@ object CROperator extends CROperator {
 
       // handle initContainers
       val initContainers: util.List[Container] = 
Option(spec.getInitContainers).getOrElse(new util.ArrayList())
-      val libLoaderInitContainer               = new ContainerBuilder()
+
+      val libLoaderInitContainer = new ContainerBuilder()
         .withName("userlib-loader")
         .withImage("busybox:1.35.0")
         .withCommand(
@@ -171,7 +173,9 @@ object CROperator extends CROperator {
             .build
         )
         .build
+
       initContainers.add(libLoaderInitContainer)
+
       spec.setInitContainers(initContainers)
 
       // handle containers
@@ -185,10 +189,10 @@ object CROperator extends CROperator {
               .withSubPath(jarName)
               .build)
           .toList
-          .asJava
 
       val containers: util.List[Container] = 
Option(spec.getContainers).getOrElse(new util.ArrayList())
-      containers.asScala.zipWithIndex
+
+      containers.zipWithIndex
         .find { case (e, _) => e.getName == "flink-main-container" }
         .map { case (e, idx) =>
           val volMounts = Option(e.getVolumeMounts)
@@ -207,6 +211,7 @@ object CROperator extends CROperator {
               .withVolumeMounts(flinkMainContainerVolMounts)
               .build)
         )
+
       spec.setContainers(containers)
 
       // handle volumes
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
index b9e0eac00..d07b60540 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
@@ -23,7 +23,7 @@ import 
org.apache.streampark.flink.kubernetes.v2.FlinkRestRequest.{StopJobSptReq
 import org.apache.streampark.flink.kubernetes.v2.model._
 import org.apache.streampark.flink.kubernetes.v2.model.TrackKey._
 import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
-import org.apache.streampark.flink.kubernetes.v2.operator.OprErr._
+import org.apache.streampark.flink.kubernetes.v2.operator.OprError._
 
 import zio.{durationInt, IO, Schedule, ZIO}
 import zio.stream.ZStream
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprErr.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
similarity index 98%
rename from 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprErr.scala
rename to 
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
index f33623b68..daf8b8a03 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprErr.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.v2.operator
 
-object OprErr {
+object OprError {
 
   case class UnsupportedAction(msg: String) extends Exception("Unsupported 
action: " + msg)
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/package.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/package.scala
index 1a37f412b..c2bd7b0f0 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/package.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/package.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.kubernetes.v2
 
 import org.apache.streampark.common.conf.InternalConfigHolder
-import org.apache.streampark.flink.kubernetes.v2.Config.LOG_FLINK_CR_YAML
+import 
org.apache.streampark.flink.kubernetes.v2.FlinkK8sConfig.LOG_FLINK_CR_YAML
 
 /**
  * Notes:
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingEmbeddedFileServer.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingEmbeddedFileServer.scala
index c005c1ef2..84b31a569 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingEmbeddedFileServer.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingEmbeddedFileServer.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.kubernetes.v2.example
 
 import org.apache.streampark.common.zio.ZIOExt.unsafeRun
-import org.apache.streampark.flink.kubernetes.v2.httpfs.{EmbeddedFileServer, 
FileMirror}
+import org.apache.streampark.flink.kubernetes.v2.fs.{EmbeddedFileServer, 
FileMirror}
 
 import org.scalatest.{BeforeAndAfterAll, Ignore}
 import org.scalatest.wordspec.AnyWordSpecLike
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
index fba938449..2d110e6a3 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.kubernetes.v2.example
 import org.apache.streampark.common.zio.{liftValueAsSome, PrettyStringOps}
 import 
org.apache.streampark.common.zio.ZIOContainerSubscription.{ConcurrentMapExtension,
 RefMapExtension}
 import org.apache.streampark.common.zio.ZIOExt.{unsafeRun, IOOps, ZStreamOps}
-import org.apache.streampark.flink.kubernetes.v2.httpfs.EmbeddedFileServer
+import org.apache.streampark.flink.kubernetes.v2.fs.EmbeddedFileServer
 import org.apache.streampark.flink.kubernetes.v2.model._
 import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
 import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/package.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/package.scala
index d2ed58ec0..e75fb0b21 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/package.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/package.scala
@@ -17,7 +17,7 @@
 package org.apache.streampark.flink.kubernetes.v2
 
 import org.apache.streampark.common.conf.InternalConfigHolder
-import 
org.apache.streampark.flink.kubernetes.v2.Config.EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR
+import 
org.apache.streampark.flink.kubernetes.v2.FlinkK8sConfig.EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR
 import org.apache.streampark.flink.kubernetes.v2.example.clearTestAssets
 
 package object example {

Reply via email to