wolfboys commented on code in PR #4194:
URL: https://github.com/apache/streampark/pull/4194#discussion_r1957952700
##########
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala:
##########
@@ -34,6 +35,7 @@ import org.apache.commons.lang3.StringUtils
import java.io.File
import java.util.concurrent.{LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
+import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
Review Comment:
It is recommended to use Implicits instead of ImplicitConversions:
```
import org.apache.streampark.common.util.Implicits._
```
##########
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala:
##########
@@ -139,29 +148,33 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
execStep(5) {
usingDockerClient {
dockerClient =>
- val pullImageCmd = {
- // when the register address prefix is explicitly identified on
base image tag,
- // the user's pre-saved docker register auth info would be used.
- val pullImageCmdState =
- dockerConf.registerAddress != null && !baseImageTag.startsWith(
- dockerConf.registerAddress)
- if (pullImageCmdState) {
- dockerClient.pullImageCmd(baseImageTag)
- } else {
- dockerClient
- .pullImageCmd(baseImageTag)
- .withAuthConfig(dockerConf.toAuthConf)
+ if (isImagePresent(dockerClient, baseImageTag)) {
Review Comment:
What do you think about changing the code to this?
```
val imgExists =
dockerClient.listImagesCmd().exec().exists(_.getRepoTags.exists(_.contains(baseImageTag)))
if (imgExists) {
logInfo(s"found local docker image $baseImageTag, no need to
pull from remote.")
} else {
val pullCmdCallback = {
// when the register address prefix is explicitly identified
on base image tag,
// the user's pre-saved docker register auth info would be
used.
val pullImageCmdState =
dockerConf.registerAddress != null &&
!baseImageTag.startsWith(
dockerConf.registerAddress)
if (pullImageCmdState) {
dockerClient.pullImageCmd(baseImageTag)
} else {
dockerClient
.pullImageCmd(baseImageTag)
.withAuthConfig(dockerConf.toAuthConf)
}
}.asInstanceOf[HackPullImageCmd]
.start(watchDockerPullProcess {
pullRsp =>
dockerProcess.pull.update(pullRsp)
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
})
pullCmdCallback.awaitCompletion
logInfo(s"Already pulled docker image from remote register,
imageTag=$baseImageTag")
}
```
##########
streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala:
##########
@@ -59,6 +61,13 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
dockerProcessWatcher = watcher
}
+ private def isImagePresent(dockerClient: DockerClient, fullImageName:
String): Boolean = {
Review Comment:
This code can be made more concise.
```scala
dockerClient.listImagesCmd().exec().exists(_.getRepoTags.exists(_.contains(baseImageTag)))
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]