This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new c4df0b380ef [CAMEL-21238] Reloading in k8s run does not change route
behaviour (#15672)
c4df0b380ef is described below
commit c4df0b380effe1495fa15a00a98dceb98521c396
Author: Thomas Diesler <[email protected]>
AuthorDate: Wed Sep 25 13:44:02 2024 +0200
[CAMEL-21238] Reloading in k8s run does not change route behaviour (#15672)
---
.../core/commands/kubernetes/KubernetesExport.java | 11 +-
.../core/commands/kubernetes/KubernetesRun.java | 186 ++++++++++++---------
.../jbang/core/commands/kubernetes/PodLogs.java | 32 ++--
3 files changed, 133 insertions(+), 96 deletions(-)
diff --git
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesExport.java
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesExport.java
index f98f5666de8..bdfc62d445d 100644
---
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesExport.java
+++
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesExport.java
@@ -175,6 +175,10 @@ public class KubernetesExport extends Export {
runtime = RuntimeType.quarkus;
}
+ if (!quiet) {
+ printer().println("Exporting application ...");
+ }
+
if (!buildTool.equals("maven")) {
printer().printf("--build-tool=%s is not yet supported%n",
buildTool);
}
@@ -423,8 +427,11 @@ public class KubernetesExport extends Export {
private void addLabel(String key, String value) {
var labelArray = Optional.ofNullable(labels).orElse(new String[0]);
var labelList = new ArrayList<>(Arrays.asList(labelArray));
- labelList.add("%s=%s".formatted(key, value));
- labels = labelList.toArray(new String[0]);
+ var labelEntry = "%s=%s".formatted(key, value);
+ if (!labelList.contains(labelEntry)) {
+ labelList.add(labelEntry);
+ labels = labelList.toArray(new String[0]);
+ }
}
private String resolveImageGroup() {
diff --git
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesRun.java
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesRun.java
index c7ccd393e0b..d8fec43e52c 100644
---
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesRun.java
+++
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/KubernetesRun.java
@@ -24,19 +24,17 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
import io.fabric8.kubernetes.api.model.Pod;
+import org.apache.camel.CamelContext;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.dsl.jbang.core.commands.CamelJBangMain;
import org.apache.camel.dsl.jbang.core.commands.kubernetes.traits.BaseTrait;
-import org.apache.camel.dsl.jbang.core.common.Printer;
import org.apache.camel.dsl.jbang.core.common.RuntimeCompletionCandidates;
import org.apache.camel.dsl.jbang.core.common.RuntimeType;
import org.apache.camel.dsl.jbang.core.common.RuntimeTypeConverter;
import org.apache.camel.dsl.jbang.core.common.SourceScheme;
-import org.apache.camel.dsl.jbang.core.common.StringPrinter;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.support.FileWatcherResourceReloadStrategy;
import org.apache.camel.util.FileUtil;
@@ -139,7 +137,7 @@ public class KubernetesRun extends KubernetesBaseCommand {
@CommandLine.Option(names = { "--cluster-type" },
description = "The target cluster type. Special
configurations may be applied to different cluster types such as Kind or
Minikube.")
- String clusterType;
+ String clusterType = "Kubernetes";
@CommandLine.Option(names = { "--image-build" }, defaultValue = "true",
description = "Whether to build container image as
part of the run.")
@@ -239,6 +237,9 @@ public class KubernetesRun extends KubernetesBaseCommand {
description = "Maven/Gradle build properties, ex.
--build-property=prop1=foo")
List<String> buildProperties = new ArrayList<>();
+ CamelContext reloadContext;
+ int reloadCount;
+
public KubernetesRun(CamelJBangMain main) {
super(main);
}
@@ -251,30 +252,17 @@ public class KubernetesRun extends KubernetesBaseCommand {
public Integer doCall() throws Exception {
String projectName = getProjectName();
- String workingDir = RUN_PLATFORM_DIR + "/" + projectName;
-
- printer().println("Exporting application ...");
-
- // Cache export output in String for later usage in case of error
- Printer runPrinter = printer();
- StringPrinter exportPrinter = new StringPrinter();
- getMain().withPrinter(exportPrinter);
-
+ String workingDir = getIndexedWorkingDir(projectName);
KubernetesExport export = configureExport(workingDir);
int exit = export.export();
-
- // Revert printer to this run command's printer
- getMain().withPrinter(runPrinter);
if (exit != 0) {
- // print export command output with error details
- printer().println(exportPrinter.getOutput());
+ printer().println("Project export failed!");
return exit;
}
if (output != null) {
exit = buildProject(workingDir);
-
if (exit != 0) {
printer().println("Project build failed!");
return exit;
@@ -299,49 +287,32 @@ public class KubernetesRun extends KubernetesBaseCommand {
return 0;
}
- exit = deployProject(workingDir);
-
+ exit = deployProject(workingDir, false);
if (exit != 0) {
- printer().println("Deployment to %s
failed!".formatted(Optional.ofNullable(clusterType)
- .map(StringHelper::capitalize).orElse("Kubernetes")));
+ printer().println("Project deploy failed!");
return exit;
}
- if (dev) {
- DefaultCamelContext reloadContext = new DefaultCamelContext(false);
- configureFileWatch(reloadContext, export, workingDir);
- reloadContext.start();
-
- if (cleanup) {
- installShutdownInterceptor(projectName, workingDir);
- }
- }
-
if (dev || wait || logs) {
+ waitForRunningPod(projectName);
+ }
- if (!quiet) {
- String kubectlCmd = "kubectl get pod";
- if (namespace != null) {
- kubectlCmd += " -n %s".formatted(namespace);
- }
- kubectlCmd += " -l
%s=%s".formatted(BaseTrait.INTEGRATION_LABEL, projectName);
- printer().println("Run: " + kubectlCmd);
- }
-
- client(Pod.class).withLabel(BaseTrait.INTEGRATION_LABEL,
projectName)
- .waitUntilCondition(it ->
"Running".equals(it.getStatus().getPhase()), 10, TimeUnit.MINUTES);
+ if (dev) {
+ setupDevMode(projectName, workingDir);
}
if (dev || logs) {
- PodLogs logsCommand = new PodLogs(getMain());
- logsCommand.withClient(client());
- logsCommand.label = "%s=%s".formatted(BaseTrait.INTEGRATION_LABEL,
projectName);
- logsCommand.doCall();
+ startPodLogging(projectName);
+ printer().println("Stopped pod logging!");
}
return 0;
}
+ private String getIndexedWorkingDir(String projectName) {
+ return RUN_PLATFORM_DIR + "/" + "%s-%03d".formatted(projectName,
reloadCount);
+ }
+
private KubernetesExport configureExport(String workingDir) {
KubernetesExport.ExportConfigurer configurer = new
KubernetesExport.ExportConfigurer(
runtime,
@@ -401,6 +372,84 @@ public class KubernetesRun extends KubernetesBaseCommand {
return export;
}
+ private void setupDevMode(String projectName, String workingDir) throws
Exception {
+
+ String watchDir = ".";
+ FileFilter filter = null;
+ if (filePaths != null && filePaths.length > 0) {
+ String filePath =
FileUtil.onlyPath(SourceScheme.onlyName(filePaths[0]));
+ if (filePath != null) {
+ watchDir = filePath;
+ }
+
+ filter = pathname -> Arrays.stream(filePaths)
+ .map(FileUtil::stripPath)
+ .anyMatch(name -> name.equals(pathname.getName()));
+ }
+
+ FileWatcherResourceReloadStrategy reloadStrategy = new
FileWatcherResourceReloadStrategy(watchDir);
+ reloadStrategy.setResourceReload((name, resource) -> {
+ reloadCount += 1;
+ reloadContext.close();
+ printer().printf("Reloading project due to file change: %s%n",
FileUtil.stripPath(name));
+ String reloadWorkingDir = getIndexedWorkingDir(projectName);
+ KubernetesExport export = configureExport(reloadWorkingDir);
+ int exit = export.export();
+ if (exit != 0) {
+ printer().printf("Project reexport failed for: %s%n",
reloadWorkingDir);
+ return;
+ }
+ exit = deployProject(reloadWorkingDir, true);
+ if (exit != 0) {
+ printer().printf("Project redeploy failed for: %s%n",
reloadWorkingDir);
+ return;
+ }
+ if (dev || wait || logs) {
+ waitForRunningPod(projectName);
+ }
+ if (dev) {
+ setupDevMode(projectName, reloadWorkingDir);
+ }
+ printer().printf("Project reloaded: %s%n", reloadWorkingDir);
+ });
+ if (filter != null) {
+ reloadStrategy.setFileFilter(filter);
+ }
+
+ reloadContext = new DefaultCamelContext(false);
+ reloadContext.addService(reloadStrategy);
+ reloadContext.start();
+
+ if (cleanup) {
+ installShutdownInterceptor(projectName, workingDir);
+ }
+ }
+
+ private void startPodLogging(String projectName) throws Exception {
+ try {
+ PodLogs logsCommand = new PodLogs(getMain());
+ logsCommand.withClient(client());
+ logsCommand.label = "%s=%s".formatted(BaseTrait.INTEGRATION_LABEL,
projectName);
+ logsCommand.doCall();
+ } catch (Exception e) {
+ printer().println("Failed to read pod logs - " + e);
+ throw e;
+ }
+ }
+
+ private void waitForRunningPod(String projectName) {
+ if (!quiet) {
+ String kubectlCmd = "kubectl get pod";
+ if (namespace != null) {
+ kubectlCmd += " -n %s".formatted(namespace);
+ }
+ kubectlCmd += " -l %s=%s".formatted(BaseTrait.INTEGRATION_LABEL,
projectName);
+ printer().println("Run: " + kubectlCmd);
+ }
+ client(Pod.class).withLabel(BaseTrait.INTEGRATION_LABEL, projectName)
+ .waitUntilCondition(it ->
"Running".equals(it.getStatus().getPhase()), 10, TimeUnit.MINUTES);
+ }
+
private void installShutdownInterceptor(String projectName, String
workingDir) {
KubernetesDelete deleteCommand = new KubernetesDelete(getMain());
deleteCommand.name = projectName;
@@ -454,9 +503,9 @@ public class KubernetesRun extends KubernetesBaseCommand {
return 0;
}
- private Integer deployProject(String workingDir) throws IOException,
InterruptedException {
- printer().println("Deploying to %s
...".formatted(Optional.ofNullable(clusterType)
- .map(StringHelper::capitalize).orElse("Kubernetes")));
+ private Integer deployProject(String workingDir, boolean reload) throws
Exception {
+
+ printer().println("Deploying to %s ...".formatted(clusterType));
// Run build via Maven
String mvnw = "/mvnw";
@@ -508,6 +557,9 @@ public class KubernetesRun extends KubernetesBaseCommand {
}
args.add("package");
+ if (reload) {
+ args.add("k8s:undeploy");
+ }
args.add("k8s:deploy");
}
@@ -522,43 +574,13 @@ public class KubernetesRun extends KubernetesBaseCommand {
// wait for that process to exit as we run in foreground
int exit = p.waitFor();
if (exit != 0) {
- printer().println("Deployment failed!");
+ printer().println("Deployment to %s
failed!".formatted(clusterType));
return exit;
}
return 0;
}
- private void configureFileWatch(DefaultCamelContext camelContext,
KubernetesExport export, String workingDir)
- throws Exception {
- String watchDir = ".";
- FileFilter filter = null;
- if (filePaths != null && filePaths.length > 0) {
- String filePath =
FileUtil.onlyPath(SourceScheme.onlyName(filePaths[0]));
- if (filePath != null) {
- watchDir = filePath;
- }
-
- filter = pathname -> Arrays.stream(filePaths)
- .map(FileUtil::stripPath)
- .anyMatch(name -> name.equals(pathname.getName()));
- }
-
- FileWatcherResourceReloadStrategy reloadStrategy
- = new FileWatcherResourceReloadStrategy(watchDir);
- reloadStrategy.setResourceReload((name, resource) -> {
- printer().printf("Reloading project due to file change: %s%n",
FileUtil.stripPath(name));
- int refresh = export.export();
- if (refresh == 0) {
- deployProject(workingDir);
- }
- });
- if (filter != null) {
- reloadStrategy.setFileFilter(filter);
- }
- camelContext.addService(reloadStrategy);
- }
-
private String getProjectName() {
if (image != null) {
return KubernetesHelper.sanitize(StringHelper.beforeLast(image,
":"));
diff --git
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/PodLogs.java
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/PodLogs.java
index d55ccf85376..308a6f7db2b 100644
---
a/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/PodLogs.java
+++
b/dsl/camel-jbang/camel-jbang-plugin-kubernetes/src/main/java/org/apache/camel/dsl/jbang/core/commands/kubernetes/PodLogs.java
@@ -19,6 +19,7 @@ package org.apache.camel.dsl.jbang.core.commands.kubernetes;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.util.concurrent.atomic.AtomicInteger;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
@@ -55,7 +56,7 @@ public class PodLogs extends KubernetesBaseCommand {
description = "The number of lines from the end of the
logs to show. Defaults to -1 to show all the lines.")
int tail = -1;
- int maxWaitAttempts = 20; // total timeout of 60 seconds
+ int maxWaitAttempts = 30; // total timeout of 60 seconds
public PodLogs(CamelJBangMain main) {
super(main);
@@ -84,16 +85,18 @@ public class PodLogs extends KubernetesBaseCommand {
}
boolean shouldResume = true;
- int resumeCount = 0;
+ AtomicInteger resumeCount = new AtomicInteger();
while (shouldResume) {
shouldResume = watchLogs(parts[0], parts[1], container,
resumeCount);
- resumeCount++;
+ resumeCount.incrementAndGet();
+ printer().printf("PodLogs: [resume=%b, count=%d]%n", shouldResume,
resumeCount.get());
+ sleepWell();
}
return 0;
}
- public boolean watchLogs(String label, String labelValue, String
container, int resumeCount) {
+ public boolean watchLogs(String label, String labelValue, String
container, AtomicInteger resumeCount) {
PodList pods = pods().withLabel(label, labelValue).list();
Pod pod = pods.getItems().stream()
@@ -102,17 +105,13 @@ public class PodLogs extends KubernetesBaseCommand {
.orElse(null);
if (pod == null) {
- if (resumeCount == 0) {
+ if (resumeCount.get() == 0) {
printer().printf("Pod for label %s=%s not available - Waiting
...%n".formatted(label, labelValue));
}
// use 2-sec delay in waiting for pod logs mode
- try {
- Thread.sleep(2000L);
- } catch (InterruptedException e) {
- printer().printf("Interrupted while waiting for pod - %s%n",
e.getMessage());
- }
- return resumeCount < maxWaitAttempts;
+ sleepWell();
+ return resumeCount.get() < maxWaitAttempts;
}
String containerName = null;
@@ -145,11 +144,20 @@ public class PodLogs extends KubernetesBaseCommand {
String line;
while ((line = reader.readLine()) != null) {
printer().println(line);
+ resumeCount.set(0);
}
} catch (IOException e) {
printer().println("Failed to read pod logs - " + e.getMessage());
}
- return resumeCount < 25;
+ return resumeCount.get() < maxWaitAttempts;
+ }
+
+ private void sleepWell() {
+ try {
+ Thread.sleep(2000L);
+ } catch (InterruptedException e) {
+ printer().printf("Interrupted while waiting for pod - %s%n",
e.getMessage());
+ }
}
}