This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dependency
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dependency by this push:
new e03dcd825 minor improvement
e03dcd825 is described below
commit e03dcd825299a20794392d3344ef09adb6551a62
Author: benjobs <[email protected]>
AuthorDate: Fri Aug 4 17:33:40 2023 +0800
minor improvement
---
.../apache/streampark/common/zio/ZIOLogger.scala | 12 +++++------
.../console/core/runner/EnvInitializer.java | 2 +-
.../v2/{Config.scala => FlinkK8sConfig.scala} | 2 +-
.../kubernetes/v2/FlinkMemorySizeParser.scala | 24 ++++++++++------------
.../flink/kubernetes/v2/FlinkRestRequest.scala | 8 ++++----
.../v2/{httpfs => fs}/EmbeddedFileServer.scala | 2 +-
.../kubernetes/v2/{httpfs => fs}/FileMirror.scala | 2 +-
.../v2/{httpfs => fs}/FileServerPeerAddress.scala | 2 +-
.../kubernetes/v2/{httpfs => fs}/package.scala | 4 ++--
.../kubernetes/v2/observer/FlinkK8sObserver.scala | 16 +++++++--------
.../v2/observer/RawClusterObserver.scala | 6 ++----
.../v2/observer/RestSvcEndpointObserver.scala | 1 +
.../flink/kubernetes/v2/observer/package.scala | 2 +-
.../flink/kubernetes/v2/operator/CROperator.scala | 13 ++++++++----
.../kubernetes/v2/operator/FlinkK8sOperator.scala | 2 +-
.../v2/operator/{OprErr.scala => OprError.scala} | 2 +-
.../flink/kubernetes/v2/operator/package.scala | 2 +-
.../v2/example/UsingEmbeddedFileServer.scala | 2 +-
.../kubernetes/v2/example/UsingOperator.scala | 2 +-
.../flink/kubernetes/v2/example/package.scala | 2 +-
20 files changed, 55 insertions(+), 53 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOLogger.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOLogger.scala
index 29a6232f6..616c0c0fb 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOLogger.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/zio/ZIOLogger.scala
@@ -24,7 +24,7 @@ import zio.logging.LoggerNameExtractor
import scala.collection.concurrent.TrieMap
-/** ZIO logging Backend that bridging to
[[org.apache.streampark.common.util.Logger]] */
+/** ZIOLogger that bridging to [[org.apache.streampark.common.util.Logger]] */
object ZIOLogger {
lazy val default: ZLayer[Any, Nothing, Unit] =
Runtime.addLogger(provideLogger())
@@ -48,12 +48,12 @@ object ZIOLogger {
private[this] def provideLogger(): ZLogger[String, Unit] = (
trace: Trace,
- fiberId: FiberId,
+ _: FiberId,
logLevel: LogLevel,
message: () => String,
- cause: Cause[Any],
- context: FiberRefs,
- spans: List[LogSpan],
+ _: Cause[Any],
+ _: FiberRefs,
+ _: List[LogSpan],
annotations: Map[String, String]) => {
val loggerName =
@@ -73,7 +73,7 @@ object ZIOLogger {
case LogLevel.Warning => logger.warn(msg)
case LogLevel.Error => logger.error(msg)
case LogLevel.Fatal => logger.error(msg)
- case _ => logger.error(msg)
+ case _ =>
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 011218f90..c6cc24ed8 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -30,7 +30,7 @@ import org.apache.streampark.common.zio.ZIOExt;
import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.service.SettingService;
-import org.apache.streampark.flink.kubernetes.v2.httpfs.EmbeddedFileServer;
+import org.apache.streampark.flink.kubernetes.v2.fs.EmbeddedFileServer;
import org.apache.commons.lang3.StringUtils;
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/Config.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkK8sConfig.scala
similarity index 99%
rename from
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/Config.scala
rename to
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkK8sConfig.scala
index 945dcd8cf..1defe9565 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/Config.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkK8sConfig.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.kubernetes.v2
import org.apache.streampark.common.conf.{InternalOption, Workspace}
-object Config {
+object FlinkK8sConfig {
// ----- embedded http file server config -----
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkMemorySizeParser.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkMemorySizeParser.scala
index 758c43eb0..bc0099102 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkMemorySizeParser.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkMemorySizeParser.scala
@@ -24,15 +24,12 @@ object FlinkMemorySizeParser {
private val pattern = raw"(\d+)\s*([a-zA-Z]+)".r
def parse(text: String): Option[MemorySize] = Try {
- val trimmed = text.trim
- if (trimmed.isEmpty) return None
-
- pattern.findFirstMatchIn(text) match {
+ pattern.findFirstMatchIn(text.trim) match {
case None => None
case Some(matched) =>
val size = matched.group(1).toLong
val unit = matched.group(2)
- Unit.all.find(u => u.units.contains(unit)) match {
+ Unit.all.find(_.units.contains(unit.toLowerCase)) match {
case None => None
case Some(hitUnit) => Some(MemorySize(size * hitUnit.multiplier))
}
@@ -47,14 +44,15 @@ object FlinkMemorySizeParser {
def tebiBytes: Long = bytes >> 40
}
- sealed abstract class UnitADT(val units: Array[String], val multiplier: Long)
- object Unit {
- val all = Array(Bytes, KiloBytes, MegaBytes, GigaBytes, TeraBytes)
- case object Bytes extends UnitADT(Array("b", "bytes"), 1L)
- case object KiloBytes extends UnitADT(Array("k", "kb", "kibibytes"), 1024L)
- case object MegaBytes extends UnitADT(Array("m", "mb", "mebibytes"), 1024L
* 1024L)
- case object GigaBytes extends UnitADT(Array("g", "gb", "gibibytes"), 1024L
* 1024L * 1024L)
- case object TeraBytes extends UnitADT(Array("t", "tb", "tebibytes"), 1024L
* 1024L * 1024L * 1024L)
+ private[this] object Unit {
+ sealed abstract class Unit(val units: Array[String], val multiplier: Long)
+
+ lazy val all: Array[Unit] = Array(Bytes, KiloBytes, MegaBytes, GigaBytes,
TeraBytes)
+ case object Bytes extends Unit(Array("b", "bytes"), 1L)
+ case object KiloBytes extends Unit(Array("k", "kb", "kibibytes"), 1024L)
+ case object MegaBytes extends Unit(Array("m", "mb", "mebibytes"), 1024L *
1024L)
+ case object GigaBytes extends Unit(Array("g", "gb", "gibibytes"), 1024L *
1024L * 1024L)
+ case object TeraBytes extends Unit(Array("t", "tb", "tebibytes"), 1024L *
1024L * 1024L * 1024L)
}
}
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala
index c96e88d27..9980ed512 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/FlinkRestRequest.scala
@@ -43,7 +43,7 @@ case class FlinkRestRequest(restUrl: String) {
def listJobOverviewInfo: IO[Throwable, Vector[JobOverviewInfo]] =
for {
res <- Client.request(s"$restUrl/jobs/overview")
- rs <- res.body.asJson[JobOverviewRsp]
+ rs <- res.body.asJson[JobOverviewResp]
} yield rs.jobs
/**
@@ -147,10 +147,10 @@ object FlinkRestRequest {
// --- Flink rest api models ---
- case class JobOverviewRsp(jobs: Vector[JobOverviewInfo])
+ case class JobOverviewResp(jobs: Vector[JobOverviewInfo])
- object JobOverviewRsp {
- implicit val codec: JsonCodec[JobOverviewRsp] =
DeriveJsonCodec.gen[JobOverviewRsp]
+ object JobOverviewResp {
+ implicit val codec: JsonCodec[JobOverviewResp] =
DeriveJsonCodec.gen[JobOverviewResp]
}
case class JobOverviewInfo(
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/EmbeddedFileServer.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/EmbeddedFileServer.scala
similarity index 96%
rename from
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/EmbeddedFileServer.scala
rename to
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/EmbeddedFileServer.scala
index ff16bc607..05d4badec 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/EmbeddedFileServer.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/EmbeddedFileServer.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.kubernetes.v2.httpfs
+package org.apache.streampark.flink.kubernetes.v2.fs
import org.apache.streampark.common.zio.ZIOExt.UIOOps
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileMirror.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileMirror.scala
similarity index 97%
rename from
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileMirror.scala
rename to
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileMirror.scala
index 79df265ef..a3cdc4a37 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileMirror.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileMirror.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.kubernetes.v2.httpfs
+package org.apache.streampark.flink.kubernetes.v2.fs
import zio.{IO, UIO, ZIO}
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileServerPeerAddress.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileServerPeerAddress.scala
similarity index 98%
rename from
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileServerPeerAddress.scala
rename to
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileServerPeerAddress.scala
index 3658f41b4..477a7a9bb 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/FileServerPeerAddress.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/FileServerPeerAddress.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.streampark.flink.kubernetes.v2.httpfs
+package org.apache.streampark.flink.kubernetes.v2.fs
import org.apache.streampark.common.zio.ZIOExt.{unsafeRun, UIOOps}
import org.apache.streampark.flink.kubernetes.v2.K8sTools.{newK8sClient,
usingK8sClient}
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/package.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/package.scala
similarity index 87%
rename from
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/package.scala
rename to
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/package.scala
index e5382cd5e..21480e6a6 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/httpfs/package.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/fs/package.scala
@@ -18,9 +18,9 @@
package org.apache.streampark.flink.kubernetes.v2
import org.apache.streampark.common.conf.InternalConfigHolder
-import
org.apache.streampark.flink.kubernetes.v2.Config.{EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR,
EMBEDDED_HTTP_FILE_SERVER_PORT}
+import
org.apache.streampark.flink.kubernetes.v2.FlinkK8sConfig.{EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR,
EMBEDDED_HTTP_FILE_SERVER_PORT}
-package object httpfs {
+package object fs {
lazy val localMirrorDir: String =
InternalConfigHolder.get(EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR)
lazy val fileServerPort: Int =
InternalConfigHolder.get(EMBEDDED_HTTP_FILE_SERVER_PORT)
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
index a355be3d3..ee23bd49d 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala
@@ -131,10 +131,10 @@ object FlinkK8sObserver extends FlinkK8sObserver {
for {
_ <- key match {
- case ApplicationJobKey(id, ns, name) =>
trackCluster(ns, name)
- case SessionJobKey(id, ns, name, clusterName) =>
trackSessionJob(ns, name, clusterName)
- case UnmanagedSessionJobKey(id, clusterNs, clusterId, jid) =>
trackCluster(clusterNs, clusterId)
- case ClusterKey(id, ns, name) =>
trackCluster(ns, name)
+ case ApplicationJobKey(_, ns, name) =>
trackCluster(ns, name)
+ case SessionJobKey(_, ns, name, clusterName) =>
trackSessionJob(ns, name, clusterName)
+ case UnmanagedSessionJobKey(_, clusterNs, clusterId, _) =>
trackCluster(clusterNs, clusterId)
+ case ClusterKey(_, ns, name) =>
trackCluster(ns, name)
}
_ <- trackedKeys.add(key).unit
_ <- logInfo(s"Start watching Flink resource: $key")
@@ -183,10 +183,10 @@ object FlinkK8sObserver extends FlinkK8sObserver {
for {
_ <- key match {
- case ApplicationJobKey(id, ns, name) =>
unTrackCluster(ns, name)
- case SessionJobKey(id, ns, name, clusterName) =>
unTrackSessionJob(ns, name)
- case ClusterKey(id, ns, name) =>
unTrackPureCluster(ns, name)
- case UnmanagedSessionJobKey(id, clusterNs, clusterName, jid) =>
+ case ApplicationJobKey(_, ns, name) =>
unTrackCluster(ns, name)
+ case SessionJobKey(_, ns, name, _) =>
unTrackSessionJob(ns, name)
+ case ClusterKey(_, ns, name) =>
unTrackPureCluster(ns, name)
+ case UnmanagedSessionJobKey(_, clusterNs, clusterName, _) =>
unTrackUnmanagedSessionJob(clusterNs, clusterName)
}
_ <- trackedKeys.remove(key)
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
index d4a7e12b7..5fc545483 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RawClusterObserver.scala
@@ -98,14 +98,12 @@ case class RawClusterObserver(
.map { case (clusterOv, jmConfigs) =>
val totalJmMemory = FlinkMemorySizeParser
.parse(jmConfigs.getOrElse("jobmanager.memory.process.size", "0b"))
- .map(_.mebiBytes)
- .map(e => Try(e.toInt).getOrElse(0))
+ .map(x => Try(x.mebiBytes.toInt).getOrElse(0))
.getOrElse(0)
val totalTmMemory = FlinkMemorySizeParser
.parse(jmConfigs.getOrElse("taskmanager.memory.process.size", "0b"))
- .map(_.mebiBytes * clusterOv.taskManagers)
- .map(e => Try(e.toInt).getOrElse(0))
+ .map(x => Try(x.mebiBytes * clusterOv.taskManagers).getOrElse(0))
.getOrElse(0)
ClusterMetrics(
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RestSvcEndpointObserver.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RestSvcEndpointObserver.scala
index ff97e2c5a..b38d605a3 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RestSvcEndpointObserver.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/RestSvcEndpointObserver.scala
@@ -73,6 +73,7 @@ case class RestSvcEndpointObserver(restSvcEndpointSnaps:
ConcurrentMap[(Namespac
.find(_.getPort == 8081)
.map(_.getTargetPort.getIntVal.toInt)
.getOrElse(8081)
+
Some(RestSvcEndpoint(namespace, name, port, clusterIP))
}
.mapZIO {
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/package.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/package.scala
index 0f1e6dca0..97e8ecaab 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/package.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/package.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.kubernetes.v2
import org.apache.streampark.common.conf.InternalConfigHolder
-import org.apache.streampark.flink.kubernetes.v2.Config._
+import org.apache.streampark.flink.kubernetes.v2.FlinkK8sConfig._
import
org.apache.streampark.flink.kubernetes.v2.observer.AccessFlinkRestType.AccessFlinkRestType
import zio.{durationLong, Duration}
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
index 6cefcf517..e87da3da6 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
@@ -19,7 +19,7 @@ package org.apache.streampark.flink.kubernetes.v2.operator
import org.apache.streampark.flink.kubernetes.v2.{pathLastSegment, yamlMapper}
import org.apache.streampark.flink.kubernetes.v2.K8sTools.usingK8sClient
-import org.apache.streampark.flink.kubernetes.v2.httpfs.FileMirror
+import org.apache.streampark.flink.kubernetes.v2.fs.FileMirror
import org.apache.streampark.flink.kubernetes.v2.model.{FlinkDeploymentDef,
FlinkSessionJobDef, JobDef}
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
@@ -30,6 +30,7 @@ import zio.stream.ZStream
import java.util
+import scala.collection.convert.ImplicitConversions._
import scala.jdk.CollectionConverters._
/**
@@ -157,7 +158,8 @@ object CROperator extends CROperator {
// handle initContainers
val initContainers: util.List[Container] =
Option(spec.getInitContainers).getOrElse(new util.ArrayList())
- val libLoaderInitContainer = new ContainerBuilder()
+
+ val libLoaderInitContainer = new ContainerBuilder()
.withName("userlib-loader")
.withImage("busybox:1.35.0")
.withCommand(
@@ -171,7 +173,9 @@ object CROperator extends CROperator {
.build
)
.build
+
initContainers.add(libLoaderInitContainer)
+
spec.setInitContainers(initContainers)
// handle containers
@@ -185,10 +189,10 @@ object CROperator extends CROperator {
.withSubPath(jarName)
.build)
.toList
- .asJava
val containers: util.List[Container] =
Option(spec.getContainers).getOrElse(new util.ArrayList())
- containers.asScala.zipWithIndex
+
+ containers.zipWithIndex
.find { case (e, _) => e.getName == "flink-main-container" }
.map { case (e, idx) =>
val volMounts = Option(e.getVolumeMounts)
@@ -207,6 +211,7 @@ object CROperator extends CROperator {
.withVolumeMounts(flinkMainContainerVolMounts)
.build)
)
+
spec.setContainers(containers)
// handle volumes
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
index b9e0eac00..d07b60540 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/FlinkK8sOperator.scala
@@ -23,7 +23,7 @@ import
org.apache.streampark.flink.kubernetes.v2.FlinkRestRequest.{StopJobSptReq
import org.apache.streampark.flink.kubernetes.v2.model._
import org.apache.streampark.flink.kubernetes.v2.model.TrackKey._
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
-import org.apache.streampark.flink.kubernetes.v2.operator.OprErr._
+import org.apache.streampark.flink.kubernetes.v2.operator.OprError._
import zio.{durationInt, IO, Schedule, ZIO}
import zio.stream.ZStream
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprErr.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
similarity index 98%
rename from
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprErr.scala
rename to
streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
index f33623b68..daf8b8a03 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprErr.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/OprError.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.v2.operator
-object OprErr {
+object OprError {
case class UnsupportedAction(msg: String) extends Exception("Unsupported
action: " + msg)
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/package.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/package.scala
index 1a37f412b..c2bd7b0f0 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/package.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/package.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.kubernetes.v2
import org.apache.streampark.common.conf.InternalConfigHolder
-import org.apache.streampark.flink.kubernetes.v2.Config.LOG_FLINK_CR_YAML
+import
org.apache.streampark.flink.kubernetes.v2.FlinkK8sConfig.LOG_FLINK_CR_YAML
/**
* Notes:
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingEmbeddedFileServer.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingEmbeddedFileServer.scala
index c005c1ef2..84b31a569 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingEmbeddedFileServer.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingEmbeddedFileServer.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.kubernetes.v2.example
import org.apache.streampark.common.zio.ZIOExt.unsafeRun
-import org.apache.streampark.flink.kubernetes.v2.httpfs.{EmbeddedFileServer,
FileMirror}
+import org.apache.streampark.flink.kubernetes.v2.fs.{EmbeddedFileServer,
FileMirror}
import org.scalatest.{BeforeAndAfterAll, Ignore}
import org.scalatest.wordspec.AnyWordSpecLike
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
index fba938449..2d110e6a3 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/UsingOperator.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.kubernetes.v2.example
import org.apache.streampark.common.zio.{liftValueAsSome, PrettyStringOps}
import
org.apache.streampark.common.zio.ZIOContainerSubscription.{ConcurrentMapExtension,
RefMapExtension}
import org.apache.streampark.common.zio.ZIOExt.{unsafeRun, IOOps, ZStreamOps}
-import org.apache.streampark.flink.kubernetes.v2.httpfs.EmbeddedFileServer
+import org.apache.streampark.flink.kubernetes.v2.fs.EmbeddedFileServer
import org.apache.streampark.flink.kubernetes.v2.model._
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
import org.apache.streampark.flink.kubernetes.v2.operator.FlinkK8sOperator
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/package.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/package.scala
index d2ed58ec0..e75fb0b21 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/package.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-engine/src/test/scala/org/apache/streampark/flink/kubernetes/v2/example/package.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.v2
import org.apache.streampark.common.conf.InternalConfigHolder
-import
org.apache.streampark.flink.kubernetes.v2.Config.EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR
+import
org.apache.streampark.flink.kubernetes.v2.FlinkK8sConfig.EMBEDDED_HTTP_FILE_SERVER_LOCAL_MIRROR_DIR
import org.apache.streampark.flink.kubernetes.v2.example.clearTestAssets
package object example {