This is an automated email from the ASF dual-hosted git repository. nicknezis pushed a commit to branch nicknezis/k8s-remote-debug in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit 95c3224bda3cb6e55f9da815806b2ec22ecf08e9 Author: Nicholas Nezis <[email protected]> AuthorDate: Fri Jul 30 10:59:38 2021 -0400 Adding missing logic to provide Remote Debug ports to the Executor --- .../scheduler/kubernetes/KubernetesConstants.java | 2 +- .../heron/scheduler/kubernetes/V1Controller.java | 27 ++++++++++++++-------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java index e411818..be45918 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesConstants.java @@ -68,7 +68,7 @@ public final class KubernetesConstants { public static final int CHECKPOINT_MGR_PORT = 6009; // port number the start with when more than one port needed for remote debugging public static final int JVM_REMOTE_DEBUGGER_PORT = 6010; - public static final String JVM_REMOTE_DEBUGGER_PORT_NAME = "remote-debugger"; + public static final String JVM_REMOTE_DEBUGGER_PORT_NAME = "rmt-debug"; public static final Map<ExecutorPort, Integer> EXECUTOR_PORTS = new HashMap<>(); static { diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java index a236462..747ed3c 100644 --- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java +++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -109,8 +110,7 @@ public class V1Controller extends KubernetesController { final V1Service topologyService = createTopologyService(); try { - final V1Service response = - coreClient.createNamespacedService(getNamespace(), topologyService, null, + coreClient.createNamespacedService(getNamespace(), topologyService, null, null, null); } catch (ApiException e) { KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology service", e); @@ -126,8 +126,7 @@ public class V1Controller extends KubernetesController { final V1StatefulSet statefulSet = createStatefulSet(containerResource, numberOfInstances); try { - final V1StatefulSet response = - appsClient.createNamespacedStatefulSet(getNamespace(), statefulSet, null, + appsClient.createNamespacedStatefulSet(getNamespace(), statefulSet, null, null, null); } catch (ApiException e) { KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology", e); @@ -297,15 +296,26 @@ public class V1Controller extends KubernetesController { + "] in namespace [" + getNamespace() + "] is deleted."); } - protected List<String> getExecutorCommand(String containerId) { + protected List<String> getExecutorCommand(String containerId, int numOfInstances) { + final Config configuration = getConfiguration(); + final Config runtimeConfiguration = getRuntimeConfiguration(); final Map<ExecutorPort, String> ports = KubernetesConstants.EXECUTOR_PORTS.entrySet() .stream() .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toString())); - final Config configuration = getConfiguration(); - final Config runtimeConfiguration = getRuntimeConfiguration(); + if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration)) + && numOfInstances != 0) { + List<String> remoteDebuggingPorts = new LinkedList<>(); + IntStream.range(0, numOfInstances).forEach(i -> { + int port = KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i; + remoteDebuggingPorts.add(String.valueOf(port)); + }); + ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS, + String.join(",", remoteDebuggingPorts)); + } + final String[] executorCommand = SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration, containerId, ports); @@ -383,7 +393,7 @@ public class V1Controller extends KubernetesController { templateMetaData.annotations(annotations); podTemplateSpec.setMetadata(templateMetaData); - final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID); + final List<String> command = getExecutorCommand("$" + ENV_SHARD_ID, numberOfInstances); podTemplateSpec.spec(getPodSpec(command, containerResource, numberOfInstances)); statefulSetSpec.setTemplate(podTemplateSpec); @@ -573,7 +583,6 @@ public class V1Controller extends KubernetesController { ports.add(port); }); - if (remoteDebugEnabled) { IntStream.range(0, numberOfInstances).forEach(i -> { final String portName =
