Repository: asterixdb Updated Branches: refs/heads/master fad840a64 -> 93c8b4cdf
[NO ISSUE][OTH] Add CC/NC Ping Function - user model changes: no - storage format changes: no - interface changes: no Details: - Add ping function that can be used to ask a node to ping the CC. Change-Id: I676e523dccbf94d1e5af4ea408e026af260c9b06 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2831 Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> Contrib: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/93c8b4cd Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/93c8b4cd Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/93c8b4cd Branch: refs/heads/master Commit: 93c8b4cdf029fe1a6eab403996c5b745778725aa Parents: fad840a Author: Murtadha Hubail <[email protected]> Authored: Wed Aug 1 16:45:31 2018 -0700 Committer: Murtadha Hubail <[email protected]> Committed: Wed Aug 1 18:16:30 2018 -0700 ---------------------------------------------------------------------- .../control/cc/ClusterControllerIPCI.java | 4 ++ .../control/common/base/IClusterController.java | 2 + .../control/common/base/INodeController.java | 9 ++++ .../control/common/ipc/CCNCFunctions.java | 35 +++++++++++++++ .../ipc/ClusterControllerRemoteProxy.java | 6 +++ .../common/ipc/NodeControllerRemoteProxy.java | 5 +++ .../hyracks/control/nc/NodeControllerIPCI.java | 6 +++ .../hyracks/control/nc/task/PingTask.java | 45 ++++++++++++++++++++ 8 files changed, 112 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/93c8b4cd/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java index 84cb4bd..7e5d22c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerIPCI.java @@ -161,6 +161,10 @@ class ClusterControllerIPCI implements IIPCI { ccs.getWorkQueue() .schedule(new NotifyThreadDumpResponse(ccs, tdrf.getRequestId(), tdrf.getThreadDumpJSON())); break; + case PING_RESPONSE: + CCNCFunctions.PingResponseFunction prf = (CCNCFunctions.PingResponseFunction) fn; + LOGGER.debug("Received ping response from node {}", prf.getNodeId()); + break; default: LOGGER.warn("Unknown function: " + fn.getFunctionId()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/93c8b4cd/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java index fc0154e..fbaff55 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/IClusterController.java @@ -72,4 +72,6 @@ public interface IClusterController { void getNodeControllerInfos() throws Exception; void notifyThreadDump(String nodeId, String requestId, String threadDumpJSON) throws Exception; + + void notifyPingResponse(String nodeId) throws Exception; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/93c8b4cd/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java index fa835f4..d7941f2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/base/INodeController.java @@ -84,4 +84,13 @@ public interface INodeController { * @throws IPCException */ void sendRegistrationResult(NodeParameters parameters, Exception regFailure) throws IPCException; + + /** + * Sends a request to this {@link INodeController} to ping the + * cluster controller with id {@code ccId} + * + * @param ccId + * @throws IPCException + */ + void ping(CcId ccId) throws IPCException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/93c8b4cd/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java index ce4578d..2522ebe 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/CCNCFunctions.java @@ -113,6 +113,9 @@ public class CCNCFunctions { THREAD_DUMP_REQUEST, THREAD_DUMP_RESPONSE, + PING_REQUEST, + PING_RESPONSE, + OTHER } @@ -1316,6 +1319,19 @@ public class CCNCFunctions { } } + public static class PingFunction extends CCIdentifiedFunction { + private static final long serialVersionUID = 1L; + + public PingFunction(CcId ccId) { + super(ccId); + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.PING_REQUEST; + } + } + public static class ShutdownRequestFunction extends CCIdentifiedFunction { private static final long serialVersionUID = 1L; @@ -1355,6 +1371,25 @@ public class CCNCFunctions { } } + public static class PingResponseFunction extends Function { + private static final long serialVersionUID = 1L; + + private final String nodeId; + + public PingResponseFunction(String nodeId) { + this.nodeId = nodeId; + } + + public String getNodeId() { + return nodeId; + } + + @Override + public FunctionId getFunctionId() { + return FunctionId.PING_RESPONSE; + } + } + public static class SerializerDeserializer implements IPayloadSerializerDeserializer { private final JavaSerializationBasedPayloadSerializerDeserializer javaSerde; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/93c8b4cd/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java index 8e2ec22..06904c2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/ClusterControllerRemoteProxy.java @@ -175,6 +175,12 @@ public class ClusterControllerRemoteProxy implements IClusterController { } @Override + public void notifyPingResponse(String nodeId) throws Exception { + CCNCFunctions.PingResponseFunction fn = new CCNCFunctions.PingResponseFunction(nodeId); + ipcHandle.send(-1, fn, null); + } + + @Override public String toString() { return getClass().getSimpleName() + " [" + ipcHandle.getRemoteAddress() + "]"; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/93c8b4cd/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java index 8242bdc..dd10020 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/ipc/NodeControllerRemoteProxy.java @@ -148,6 +148,11 @@ public class NodeControllerRemoteProxy implements INodeController { ipcHandle.send(-1, new CCNCFunctions.NodeRegistrationResult(parameters, regFailure), null); } + @Override + public void ping(CcId ccId) throws IPCException { + ipcHandle.send(-1, new CCNCFunctions.PingFunction(ccId), null); + } + public InetSocketAddress getAddress() { return ipcHandle.getRemoteAddress(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/93c8b4cd/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java index 08cd5d8..cdc16fa 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerIPCI.java @@ -28,6 +28,7 @@ import org.apache.hyracks.control.nc.work.ApplicationMessageWork; import org.apache.hyracks.control.nc.work.CleanupJobletWork; import org.apache.hyracks.control.nc.work.DeployBinaryWork; import org.apache.hyracks.control.nc.work.DeployJobSpecWork; +import org.apache.hyracks.control.nc.task.PingTask; import org.apache.hyracks.control.nc.work.ReportPartitionAvailabilityWork; import org.apache.hyracks.control.nc.work.StartTasksWork; import org.apache.hyracks.control.nc.work.StateDumpWork; @@ -133,6 +134,11 @@ final class NodeControllerIPCI implements IIPCI { ncs.getExecutor().submit(new ThreadDumpTask(ncs, tdrf.getRequestId(), tdrf.getCcId())); return; + case PING_REQUEST: + final CCNCFunctions.PingFunction pcf = (CCNCFunctions.PingFunction) fn; + ncs.getExecutor().submit(new PingTask(ncs, pcf.getCcId())); + return; + default: throw new IllegalArgumentException("Unknown function: " + fn.getFunctionId()); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/93c8b4cd/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java new file mode 100644 index 0000000..15c62bd --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/task/PingTask.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.control.nc.task; + +import org.apache.hyracks.api.control.CcId; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class PingTask implements Runnable { + + private static final Logger LOGGER = LogManager.getLogger(); + private final NodeControllerService ncs; + private final CcId ccId; + + public PingTask(NodeControllerService ncs, CcId ccId) { + this.ncs = ncs; + this.ccId = ccId; + } + + @Override + public void run() { + try { + ncs.getClusterController(ccId).notifyPingResponse(ncs.getId()); + } catch (Exception e) { + LOGGER.info("failed to respond to ping from cc {}", ccId, e); + } + } +} \ No newline at end of file
