This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 11f4f0a0c [Improve] Use local docker image cache when build (#4193) 
(#4194)
11f4f0a0c is described below

commit 11f4f0a0cdd02f13ddb813f7e6cc388d0f0cf907
Author: Lihe Ma <[email protected]>
AuthorDate: Thu Feb 20 00:03:50 2025 +0800

    [Improve] Use local docker image cache when build (#4193) (#4194)
    
    Co-authored-by: lihe.ma <[email protected]>
---
 .../impl/FlinkK8sApplicationBuildPipeline.scala    | 48 ++++++++++++----------
 1 file changed, 27 insertions(+), 21 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 450c0ba81..484dc5bed 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -18,6 +18,7 @@
 package org.apache.streampark.flink.packer.pipeline.impl
 
 import org.apache.streampark.common.fs.LfsOperator
+import org.apache.streampark.common.util.Implicits._
 import org.apache.streampark.common.util.ThreadUtils
 import org.apache.streampark.flink.kubernetes.PodTemplateTool
 import org.apache.streampark.flink.kubernetes.ingress.IngressController
@@ -139,29 +140,34 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
     execStep(5) {
       usingDockerClient {
         dockerClient =>
-          val pullImageCmd = {
-            // when the register address prefix is explicitly identified on 
base image tag,
-            // the user's pre-saved docker register auth info would be used.
-            val pullImageCmdState =
-              dockerConf.registerAddress != null && !baseImageTag.startsWith(
-                dockerConf.registerAddress)
-            if (pullImageCmdState) {
-              dockerClient.pullImageCmd(baseImageTag)
-            } else {
-              dockerClient
-                .pullImageCmd(baseImageTag)
-                .withAuthConfig(dockerConf.toAuthConf)
+          val imgExists = 
dockerClient.listImagesCmd().exec().exists(_.getRepoTags.exists(_.contains(baseImageTag)))
+          if (imgExists) {
+            logInfo(s"found local docker image $baseImageTag, no need to pull 
from remote.")
+          } else {
+            val pullImageCmd = {
+              // when the register address prefix is explicitly identified on 
base image tag,
+              // the user's pre-saved docker register auth info would be used.
+              val pullImageCmdState =
+                dockerConf.registerAddress != null && !baseImageTag.startsWith(
+                  dockerConf.registerAddress)
+              if (pullImageCmdState) {
+                dockerClient.pullImageCmd(baseImageTag)
+              } else {
+                dockerClient
+                  .pullImageCmd(baseImageTag)
+                  .withAuthConfig(dockerConf.toAuthConf)
+              }
             }
+            val pullCmdCallback = pullImageCmd
+              .asInstanceOf[HackPullImageCmd]
+              .start(watchDockerPullProcess {
+                pullRsp =>
+                  dockerProcess.pull.update(pullRsp)
+                  
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
+              })
+            pullCmdCallback.awaitCompletion
+            logInfo(s"Already pulled docker image from remote register, 
imageTag=$baseImageTag")
           }
-          val pullCmdCallback = pullImageCmd
-            .asInstanceOf[HackPullImageCmd]
-            .start(watchDockerPullProcess {
-              pullRsp =>
-                dockerProcess.pull.update(pullRsp)
-                
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
-            })
-          pullCmdCallback.awaitCompletion
-          logInfo(s"Already pulled docker image from remote register, 
imageTag=$baseImageTag")
       }(err => throw new Exception(s"Pull docker image failed, 
imageTag=$baseImageTag", err))
     }.getOrElse(throw getError.exception)
 

Reply via email to