This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new c4df0b380ef [CAMEL-21238] Reloading in k8s run does not change route 
behaviour (#15672)
c4df0b380ef is described below

commit c4df0b380effe1495fa15a00a98dceb98521c396
Author: Thomas Diesler <[email protected]>
AuthorDate: Wed Sep 25 13:44:02 2024 +0200

    [CAMEL-21238] Reloading in k8s run does not change route behaviour (#15672)
---
 .../core/commands/kubernetes/KubernetesExport.java |  11 +-
 .../core/commands/kubernetes/KubernetesRun.java    | 186 ++++++++++++---------
 .../jbang/core/commands/kubernetes/PodLogs.java    |  32 ++--
 3 files changed, 133 insertions(+), 96 deletions(-)

diff --git 
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesExport.java
 
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesExport.java
index f98f5666de8..bdfc62d445d 100644
--- 
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesExport.java
+++ 
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesExport.java
@@ -175,6 +175,10 @@ public class KubernetesExport extends Export {
             runtime = RuntimeType.quarkus;
         }
 
+        if (!quiet) {
+            printer().println("Exporting application ...");
+        }
+
         if (!buildTool.equals("maven")) {
             printer().printf("--build-tool=%s is not yet supported%n", 
buildTool);
         }
@@ -423,8 +427,11 @@ public class KubernetesExport extends Export {
     private void addLabel(String key, String value) {
         var labelArray = Optional.ofNullable(labels).orElse(new String[0]);
         var labelList = new ArrayList<>(Arrays.asList(labelArray));
-        labelList.add("%s=%s".formatted(key, value));
-        labels = labelList.toArray(new String[0]);
+        var labelEntry = "%s=%s".formatted(key, value);
+        if (!labelList.contains(labelEntry)) {
+            labelList.add(labelEntry);
+            labels = labelList.toArray(new String[0]);
+        }
     }
 
     private String resolveImageGroup() {
diff --git 
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesRun.java
 
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesRun.java
index c7ccd393e0b..d8fec43e52c 100644
--- 
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesRun.java
+++ 
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesRun.java
@@ -24,19 +24,17 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import io.fabric8.kubernetes.api.model.Pod;
+import org.apache.camel.CamelContext;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
 import org.apache.camel.dsl.jbang.core.commands.kubernetes.traits.BaseTrait;
-import org.apache.camel.dsl.jbang.core.common.Printer;
 import org.apache.camel.dsl.jbang.core.common.RuntimeCompletionCandidates;
 import org.apache.camel.dsl.jbang.core.common.RuntimeType;
 import org.apache.camel.dsl.jbang.core.common.RuntimeTypeConverter;
 import org.apache.camel.dsl.jbang.core.common.SourceScheme;
-import org.apache.camel.dsl.jbang.core.common.StringPrinter;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.support.FileWatcherResourceReloadStrategy;
 import org.apache.camel.util.FileUtil;
@@ -139,7 +137,7 @@ public class KubernetesRun extends KubernetesBaseCommand {
 
     @CommandLine.Option(names = { "--cluster-type" },
                         description = "The target cluster type. Special 
configurations may be applied to different cluster types such as Kind or 
Minikube.")
-    String clusterType;
+    String clusterType = "Kubernetes";
 
     @CommandLine.Option(names = { "--image-build" }, defaultValue = "true",
                         description = "Whether to build container image as 
part of the run.")
@@ -239,6 +237,9 @@ public class KubernetesRun extends KubernetesBaseCommand {
                         description = "Maven/Gradle build properties, ex. 
--build-property=prop1=foo")
     List<String> buildProperties = new ArrayList<>();
 
+    CamelContext reloadContext;
+    int reloadCount;
+
     public KubernetesRun(CamelJBangMain main) {
         super(main);
     }
@@ -251,30 +252,17 @@ public class KubernetesRun extends KubernetesBaseCommand {
     public Integer doCall() throws Exception {
         String projectName = getProjectName();
 
-        String workingDir = RUN_PLATFORM_DIR + "/" + projectName;
-
-        printer().println("Exporting application ...");
-
-        // Cache export output in String for later usage in case of error
-        Printer runPrinter = printer();
-        StringPrinter exportPrinter = new StringPrinter();
-        getMain().withPrinter(exportPrinter);
-
+        String workingDir = getIndexedWorkingDir(projectName);
         KubernetesExport export = configureExport(workingDir);
         int exit = export.export();
-
-        // Revert printer to this run command's printer
-        getMain().withPrinter(runPrinter);
         if (exit != 0) {
-            // print export command output with error details
-            printer().println(exportPrinter.getOutput());
+            printer().println("Project export failed!");
             return exit;
         }
 
         if (output != null) {
 
             exit = buildProject(workingDir);
-
             if (exit != 0) {
                 printer().println("Project build failed!");
                 return exit;
@@ -299,49 +287,32 @@ public class KubernetesRun extends KubernetesBaseCommand {
             return 0;
         }
 
-        exit = deployProject(workingDir);
-
+        exit = deployProject(workingDir, false);
         if (exit != 0) {
-            printer().println("Deployment to %s 
failed!".formatted(Optional.ofNullable(clusterType)
-                    .map(StringHelper::capitalize).orElse("Kubernetes")));
+            printer().println("Project deploy failed!");
             return exit;
         }
 
-        if (dev) {
-            DefaultCamelContext reloadContext = new DefaultCamelContext(false);
-            configureFileWatch(reloadContext, export, workingDir);
-            reloadContext.start();
-
-            if (cleanup) {
-                installShutdownInterceptor(projectName, workingDir);
-            }
-        }
-
         if (dev || wait || logs) {
+            waitForRunningPod(projectName);
+        }
 
-            if (!quiet) {
-                String kubectlCmd = "kubectl get pod";
-                if (namespace != null) {
-                    kubectlCmd += " -n %s".formatted(namespace);
-                }
-                kubectlCmd += " -l 
%s=%s".formatted(BaseTrait.INTEGRATION_LABEL, projectName);
-                printer().println("Run: " + kubectlCmd);
-            }
-
-            client(Pod.class).withLabel(BaseTrait.INTEGRATION_LABEL, 
projectName)
-                    .waitUntilCondition(it -> 
"Running".equals(it.getStatus().getPhase()), 10, TimeUnit.MINUTES);
+        if (dev) {
+            setupDevMode(projectName, workingDir);
         }
 
         if (dev || logs) {
-            PodLogs logsCommand = new PodLogs(getMain());
-            logsCommand.withClient(client());
-            logsCommand.label = "%s=%s".formatted(BaseTrait.INTEGRATION_LABEL, 
projectName);
-            logsCommand.doCall();
+            startPodLogging(projectName);
+            printer().println("Stopped pod logging!");
         }
 
         return 0;
     }
 
+    private String getIndexedWorkingDir(String projectName) {
+        return RUN_PLATFORM_DIR + "/" + "%s-%03d".formatted(projectName, 
reloadCount);
+    }
+
     private KubernetesExport configureExport(String workingDir) {
         KubernetesExport.ExportConfigurer configurer = new 
KubernetesExport.ExportConfigurer(
                 runtime,
@@ -401,6 +372,84 @@ public class KubernetesRun extends KubernetesBaseCommand {
         return export;
     }
 
+    private void setupDevMode(String projectName, String workingDir) throws 
Exception {
+
+        String watchDir = ".";
+        FileFilter filter = null;
+        if (filePaths != null && filePaths.length > 0) {
+            String filePath = 
FileUtil.onlyPath(SourceScheme.onlyName(filePaths[0]));
+            if (filePath != null) {
+                watchDir = filePath;
+            }
+
+            filter = pathname -> Arrays.stream(filePaths)
+                    .map(FileUtil::stripPath)
+                    .anyMatch(name -> name.equals(pathname.getName()));
+        }
+
+        FileWatcherResourceReloadStrategy reloadStrategy = new 
FileWatcherResourceReloadStrategy(watchDir);
+        reloadStrategy.setResourceReload((name, resource) -> {
+            reloadCount += 1;
+            reloadContext.close();
+            printer().printf("Reloading project due to file change: %s%n", 
FileUtil.stripPath(name));
+            String reloadWorkingDir = getIndexedWorkingDir(projectName);
+            KubernetesExport export = configureExport(reloadWorkingDir);
+            int exit = export.export();
+            if (exit != 0) {
+                printer().printf("Project reexport failed for: %s%n", 
reloadWorkingDir);
+                return;
+            }
+            exit = deployProject(reloadWorkingDir, true);
+            if (exit != 0) {
+                printer().printf("Project redeploy failed for: %s%n", 
reloadWorkingDir);
+                return;
+            }
+            if (dev || wait || logs) {
+                waitForRunningPod(projectName);
+            }
+            if (dev) {
+                setupDevMode(projectName, reloadWorkingDir);
+            }
+            printer().printf("Project reloaded: %s%n", reloadWorkingDir);
+        });
+        if (filter != null) {
+            reloadStrategy.setFileFilter(filter);
+        }
+
+        reloadContext = new DefaultCamelContext(false);
+        reloadContext.addService(reloadStrategy);
+        reloadContext.start();
+
+        if (cleanup) {
+            installShutdownInterceptor(projectName, workingDir);
+        }
+    }
+
+    private void startPodLogging(String projectName) throws Exception {
+        try {
+            PodLogs logsCommand = new PodLogs(getMain());
+            logsCommand.withClient(client());
+            logsCommand.label = "%s=%s".formatted(BaseTrait.INTEGRATION_LABEL, 
projectName);
+            logsCommand.doCall();
+        } catch (Exception e) {
+            printer().println("Failed to read pod logs - " + e);
+            throw e;
+        }
+    }
+
+    private void waitForRunningPod(String projectName) {
+        if (!quiet) {
+            String kubectlCmd = "kubectl get pod";
+            if (namespace != null) {
+                kubectlCmd += " -n %s".formatted(namespace);
+            }
+            kubectlCmd += " -l %s=%s".formatted(BaseTrait.INTEGRATION_LABEL, 
projectName);
+            printer().println("Run: " + kubectlCmd);
+        }
+        client(Pod.class).withLabel(BaseTrait.INTEGRATION_LABEL, projectName)
+                .waitUntilCondition(it -> 
"Running".equals(it.getStatus().getPhase()), 10, TimeUnit.MINUTES);
+    }
+
     private void installShutdownInterceptor(String projectName, String 
workingDir) {
         KubernetesDelete deleteCommand = new KubernetesDelete(getMain());
         deleteCommand.name = projectName;
@@ -454,9 +503,9 @@ public class KubernetesRun extends KubernetesBaseCommand {
         return 0;
     }
 
-    private Integer deployProject(String workingDir) throws IOException, 
InterruptedException {
-        printer().println("Deploying to %s 
...".formatted(Optional.ofNullable(clusterType)
-                .map(StringHelper::capitalize).orElse("Kubernetes")));
+    private Integer deployProject(String workingDir, boolean reload) throws 
Exception {
+
+        printer().println("Deploying to %s ...".formatted(clusterType));
 
         // Run build via Maven
         String mvnw = "/mvnw";
@@ -508,6 +557,9 @@ public class KubernetesRun extends KubernetesBaseCommand {
             }
 
             args.add("package");
+            if (reload) {
+                args.add("k8s:undeploy");
+            }
             args.add("k8s:deploy");
         }
 
@@ -522,43 +574,13 @@ public class KubernetesRun extends KubernetesBaseCommand {
         // wait for that process to exit as we run in foreground
         int exit = p.waitFor();
         if (exit != 0) {
-            printer().println("Deployment failed!");
+            printer().println("Deployment to %s 
failed!".formatted(clusterType));
             return exit;
         }
 
         return 0;
     }
 
-    private void configureFileWatch(DefaultCamelContext camelContext, 
KubernetesExport export, String workingDir)
-            throws Exception {
-        String watchDir = ".";
-        FileFilter filter = null;
-        if (filePaths != null && filePaths.length > 0) {
-            String filePath = 
FileUtil.onlyPath(SourceScheme.onlyName(filePaths[0]));
-            if (filePath != null) {
-                watchDir = filePath;
-            }
-
-            filter = pathname -> Arrays.stream(filePaths)
-                    .map(FileUtil::stripPath)
-                    .anyMatch(name -> name.equals(pathname.getName()));
-        }
-
-        FileWatcherResourceReloadStrategy reloadStrategy
-                = new FileWatcherResourceReloadStrategy(watchDir);
-        reloadStrategy.setResourceReload((name, resource) -> {
-            printer().printf("Reloading project due to file change: %s%n", 
FileUtil.stripPath(name));
-            int refresh = export.export();
-            if (refresh == 0) {
-                deployProject(workingDir);
-            }
-        });
-        if (filter != null) {
-            reloadStrategy.setFileFilter(filter);
-        }
-        camelContext.addService(reloadStrategy);
-    }
-
     private String getProjectName() {
         if (image != null) {
             return KubernetesHelper.sanitize(StringHelper.beforeLast(image, 
":"));
diff --git 
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/PodLogs.java
 
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/PodLogs.java
index d55ccf85376..308a6f7db2b 100644
--- 
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/PodLogs.java
+++ 
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/PodLogs.java
@@ -19,6 +19,7 @@ package org.apache.camel.dsl.jbang.core.commands.kubernetes;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import io.fabric8.kubernetes.api.model.Pod;
 import io.fabric8.kubernetes.api.model.PodList;
@@ -55,7 +56,7 @@ public class PodLogs extends KubernetesBaseCommand {
                         description = "The number of lines from the end of the 
logs to show. Defaults to -1 to show all the lines.")
     int tail = -1;
 
-    int maxWaitAttempts = 20; // total timeout of 60 seconds
+    int maxWaitAttempts = 30; // total timeout of 60 seconds
 
     public PodLogs(CamelJBangMain main) {
         super(main);
@@ -84,16 +85,18 @@ public class PodLogs extends KubernetesBaseCommand {
         }
 
         boolean shouldResume = true;
-        int resumeCount = 0;
+        AtomicInteger resumeCount = new AtomicInteger();
         while (shouldResume) {
             shouldResume = watchLogs(parts[0], parts[1], container, 
resumeCount);
-            resumeCount++;
+            resumeCount.incrementAndGet();
+            printer().printf("PodLogs: [resume=%b, count=%d]%n", shouldResume, 
resumeCount.get());
+            sleepWell();
         }
 
         return 0;
     }
 
-    public boolean watchLogs(String label, String labelValue, String 
container, int resumeCount) {
+    public boolean watchLogs(String label, String labelValue, String 
container, AtomicInteger resumeCount) {
         PodList pods = pods().withLabel(label, labelValue).list();
 
         Pod pod = pods.getItems().stream()
@@ -102,17 +105,13 @@ public class PodLogs extends KubernetesBaseCommand {
                 .orElse(null);
 
         if (pod == null) {
-            if (resumeCount == 0) {
+            if (resumeCount.get() == 0) {
                 printer().printf("Pod for label %s=%s not available - Waiting 
...%n".formatted(label, labelValue));
             }
 
             // use 2-sec delay in waiting for pod logs mode
-            try {
-                Thread.sleep(2000L);
-            } catch (InterruptedException e) {
-                printer().printf("Interrupted while waiting for pod - %s%n", 
e.getMessage());
-            }
-            return resumeCount < maxWaitAttempts;
+            sleepWell();
+            return resumeCount.get() < maxWaitAttempts;
         }
 
         String containerName = null;
@@ -145,11 +144,20 @@ public class PodLogs extends KubernetesBaseCommand {
             String line;
             while ((line = reader.readLine()) != null) {
                 printer().println(line);
+                resumeCount.set(0);
             }
         } catch (IOException e) {
             printer().println("Failed to read pod logs - " + e.getMessage());
         }
 
-        return resumeCount < 25;
+        return resumeCount.get() < maxWaitAttempts;
+    }
+
+    private void sleepWell() {
+        try {
+            Thread.sleep(2000L);
+        } catch (InterruptedException e) {
+            printer().printf("Interrupted while waiting for pod - %s%n", 
e.getMessage());
+        }
     }
 }

Reply via email to