This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 11f4f0a0c [Improve] Use local docker image cache when build (#4193)
(#4194)
11f4f0a0c is described below
commit 11f4f0a0cdd02f13ddb813f7e6cc388d0f0cf907
Author: Lihe Ma <[email protected]>
AuthorDate: Thu Feb 20 00:03:50 2025 +0800
[Improve] Use local docker image cache when build (#4193) (#4194)
Co-authored-by: lihe.ma <[email protected]>
---
.../impl/FlinkK8sApplicationBuildPipeline.scala | 48 ++++++++++++----------
1 file changed, 27 insertions(+), 21 deletions(-)
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 450c0ba81..484dc5bed 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -18,6 +18,7 @@
package org.apache.streampark.flink.packer.pipeline.impl
import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.common.util.Implicits._
import org.apache.streampark.common.util.ThreadUtils
import org.apache.streampark.flink.kubernetes.PodTemplateTool
import org.apache.streampark.flink.kubernetes.ingress.IngressController
@@ -139,29 +140,34 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
execStep(5) {
usingDockerClient {
dockerClient =>
- val pullImageCmd = {
- // when the register address prefix is explicitly identified on
base image tag,
- // the user's pre-saved docker register auth info would be used.
- val pullImageCmdState =
- dockerConf.registerAddress != null && !baseImageTag.startsWith(
- dockerConf.registerAddress)
- if (pullImageCmdState) {
- dockerClient.pullImageCmd(baseImageTag)
- } else {
- dockerClient
- .pullImageCmd(baseImageTag)
- .withAuthConfig(dockerConf.toAuthConf)
+ val imgExists =
dockerClient.listImagesCmd().exec().exists(_.getRepoTags.exists(_.contains(baseImageTag)))
+ if (imgExists) {
+ logInfo(s"found local docker image $baseImageTag, no need to pull
from remote.")
+ } else {
+ val pullImageCmd = {
+ // when the register address prefix is explicitly identified on
base image tag,
+ // the user's pre-saved docker register auth info would be used.
+ val pullImageCmdState =
+ dockerConf.registerAddress != null && !baseImageTag.startsWith(
+ dockerConf.registerAddress)
+ if (pullImageCmdState) {
+ dockerClient.pullImageCmd(baseImageTag)
+ } else {
+ dockerClient
+ .pullImageCmd(baseImageTag)
+ .withAuthConfig(dockerConf.toAuthConf)
+ }
}
+ val pullCmdCallback = pullImageCmd
+ .asInstanceOf[HackPullImageCmd]
+ .start(watchDockerPullProcess {
+ pullRsp =>
+ dockerProcess.pull.update(pullRsp)
+
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
+ })
+ pullCmdCallback.awaitCompletion
+ logInfo(s"Already pulled docker image from remote register,
imageTag=$baseImageTag")
}
- val pullCmdCallback = pullImageCmd
- .asInstanceOf[HackPullImageCmd]
- .start(watchDockerPullProcess {
- pullRsp =>
- dockerProcess.pull.update(pullRsp)
-
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
- })
- pullCmdCallback.awaitCompletion
- logInfo(s"Already pulled docker image from remote register,
imageTag=$baseImageTag")
}(err => throw new Exception(s"Pull docker image failed,
imageTag=$baseImageTag", err))
}.getOrElse(throw getError.exception)