This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 1e367fd1fc74e4614b9f7c1eeea1c3155c2a9615 Author: Ali Alsuliman <[email protected]> AuthorDate: Tue Jul 4 21:30:31 2023 -0700 [ASTERIXDB-3216][NET] Always resolve NC address on node registration - user model changes: no - storage format changes: no - interface changes: no Details: Always resolve NC address on node registration. - add some loggings Change-Id: I37477de0a932439e301d8ff6f88de1355d612736 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17628 Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> --- .../http/server/NCQueryCancellationServlet.java | 3 ++ .../api/http/server/NCQueryServiceServlet.java | 3 +- .../app/active/ActiveEntityEventsListener.java | 10 ++--- .../app/active/ActiveNotificationHandler.java | 52 +++++++++++----------- .../asterix/app/message/CancelQueryRequest.java | 8 +++- .../message/ExecuteStatementRequestMessage.java | 4 +- .../asterix/app/translator/QueryTranslator.java | 4 ++ .../apache/hyracks/api/comm/NetworkAddress.java | 7 ++- .../hyracks/control/cc/work/CancelJobWork.java | 5 +++ .../cc/work/JobletCleanupNotificationWork.java | 7 ++- .../hyracks/control/cc/work/RegisterNodeWork.java | 6 ++- .../control/cc/work/WaitForJobCompletionWork.java | 5 +++ .../hyracks/control/nc/NodeControllerService.java | 2 +- .../hyracks/control/nc/work/AbortTasksWork.java | 5 +++ .../hyracks/control/nc/work/CleanupJobletWork.java | 7 ++- .../hyracks/control/nc/work/StartTasksWork.java | 5 +++ 16 files changed, 91 insertions(+), 42 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java index 41f2a53dc2..b2134dcfc6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java @@ -67,6 +67,9 @@ public class NCQueryCancellationServlet extends AbstractServlet { CancelQueryRequest cancelQueryMessage = new CancelQueryRequest(serviceCtx.getNodeId(), cancelQueryFuture.getFutureId(), uuid, clientContextId); // TODO(mblow): multicc -- need to send cancellation to the correct cc + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sending CancelQueryRequest with uuid:{}, clientContextID:{}", uuid, clientContextId); + } messageBroker.sendMessageToPrimaryCC(cancelQueryMessage); CancelQueryResponse cancelResponse = (CancelQueryResponse) cancelQueryFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java index 1ec74055d0..9fa479c398 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java @@ -165,7 +165,8 @@ public class NCQueryServiceServlet extends QueryServiceServlet { CancelQueryRequest cancelQueryMessage = new CancelQueryRequest(nodeId, cancelQueryFuture.getFutureId(), uuid, clientContextID); // TODO(mblow): multicc -- need to send cancellation to the correct cc - LOGGER.info("Cancelling query due to {}", exception.getClass().getSimpleName()); + LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due to {}", uuid, clientContextID, + exception.getClass().getSimpleName()); messageBroker.sendMessageToPrimaryCC(cancelQueryMessage); if (wait) { cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index ddd3d64682..3fd339e8df 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -195,12 +195,12 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @SuppressWarnings("unchecked") protected void finish(ActiveEvent event) throws HyracksDataException { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "the job {} finished", jobId); + LOGGER.log(level, "Active job {} finished", jobId); } JobId lastJobId = jobId; if (numRegistered != numDeRegistered) { LOGGER.log(Level.WARN, - "the job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId, + "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId, numRegistered, numDeRegistered); } jobId = null; @@ -208,7 +208,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl JobStatus jobStatus = status.getLeft(); List<Exception> exceptions = status.getRight(); if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "The job finished with status: {}", jobStatus); + LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus); } if (!jobSuccessfullyTerminated(jobStatus)) { jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) @@ -372,7 +372,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void recover() { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Recover is called on " + entityId); + LOGGER.log(level, "Recover is called on {}", entityId); } if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); @@ -456,7 +456,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl try { metadataProvider.getApplicationContext().getHcc().cancelJob(jobId); } catch (Throwable th) { - LOGGER.warn("Failed to cancel active job", th); + LOGGER.warn("Failed to cancel active job {}", jobId, th); e.addSuppressed(th); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 284929f5ae..6b3581e66c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -66,22 +66,24 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override protected void handle(ActiveEvent event) { - EntityId entityId = jobId2EntityId.get(event.getJobId()); + JobId jobId = event.getJobId(); + Kind eventKind = event.getEventKind(); + EntityId entityId = jobId2EntityId.get(jobId); if (entityId != null) { IActiveEntityEventsListener listener = entityEventListeners.get(entityId); if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Next event is of type " + event.getEventKind()); + LOGGER.log(level, "Next event is {} for job {}", eventKind, jobId); } - if (event.getEventKind() == Kind.JOB_FINISHED) { - LOGGER.log(level, "Removing the job"); - jobId2EntityId.remove(event.getJobId()); + if (eventKind == Kind.JOB_FINISHED) { + LOGGER.log(level, "Removing job {}", jobId); + jobId2EntityId.remove(jobId); } if (listener != null) { LOGGER.log(level, "Notifying the listener"); listener.notify(event); } } else { - LOGGER.log(Level.ERROR, "Entity not found for received message for job " + event.getJobId()); + LOGGER.log(Level.ERROR, "Entity not found for event {} for job {}", eventKind, jobId); } } @@ -90,45 +92,43 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, - "notifyJobCreation(JobId jobId, JobSpecification jobSpecification) was called with jobId = " - + jobId); + LOGGER.log(level, "notifyJobCreation was called for job {}", jobId); } Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); if (!(property instanceof EntityId)) { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Job is not of type active job. property found to be: " + property); + LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property); } return; } EntityId entityId = (EntityId) property; monitorJob(jobId, entityId); boolean found = jobId2EntityId.get(jobId) != null; - LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive")); + LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive")); add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification)); } private synchronized void monitorJob(JobId jobId, EntityId entityId) { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) called with job id: " + jobId); + LOGGER.log(level, "monitorJob was called for job {}", jobId); } boolean found = jobId2EntityId.get(jobId) != null; if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Job was found to be: " + (found ? "Active" : "Inactive")); + LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive")); } if (entityEventListeners.containsKey(entityId)) { if (jobId2EntityId.containsKey(jobId)) { if (LOGGER.isErrorEnabled()) { - LOGGER.error("Job is already being monitored for job: " + jobId); + LOGGER.error("Job {} is already being monitored", jobId); } return; } if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "monitoring started for job id: " + jobId); + LOGGER.log(level, "Monitoring started for job {}", jobId); } } else { if (LOGGER.isEnabled(level)) { - LOGGER.info("No listener was found for the entity: " + entityId); + LOGGER.info("No listener was found for the entity {} for job {}", entityId, jobId); } } jobId2EntityId.put(jobId, entityId); @@ -146,14 +146,14 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Getting notified of job finish for JobId: " + jobId); + LOGGER.log(level, "Getting notified of job finish for job {}", jobId); } EntityId entityId = jobId2EntityId.get(jobId); if (entityId != null) { add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions))); } else { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "no need to notify job finish"); + LOGGER.log(level, "No need to notify JOB_FINISHED for job {}", jobId); } } } @@ -169,11 +169,11 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public IActiveEntityEventsListener getListener(EntityId entityId) { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was called with entity " + entityId); + LOGGER.log(level, "getActiveEntityListener was called with entity {}", entityId); } IActiveEntityEventsListener listener = entityEventListeners.get(entityId); if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Listener found: " + listener); + LOGGER.log(level, "Listener found: {}", listener); } return entityEventListeners.get(entityId); } @@ -192,8 +192,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "registerListener(IActiveEntityEventsListener listener) was called for the entity " - + listener.getEntityId()); + LOGGER.log(level, "registerListener was called for the entity {}", listener.getEntityId()); } if (entityEventListeners.containsKey(listener.getEntityId())) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId()); @@ -207,8 +206,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener listener) was called for the entity " - + listener.getEntityId()); + LOGGER.log(level, "unregisterListener was called for the entity {}", listener.getEntityId()); } IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId()); if (registeredListener == null) { @@ -226,7 +224,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active for (IActiveEntityEventsListener listener : getEventListeners()) { synchronized (listener) { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Entity " + listener.getEntityId() + " is " + listener.getState()); + LOGGER.log(level, "Entity {} is {}", listener.getEntityId(), listener.getState()); } listener.notifyAll(); } @@ -276,11 +274,11 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active public void resumeOrHalt(IActiveEntityEventsListener listener, MetadataProvider metadataProvider) { try { if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Resuming " + listener.getEntityId()); + LOGGER.log(level, "Resuming {}", listener.getEntityId()); } ((ActiveEntityEventsListener) listener).resume(metadataProvider); if (LOGGER.isEnabled(level)) { - LOGGER.log(level, listener.getEntityId() + " resumed"); + LOGGER.log(level, "{} resumed", listener.getEntityId()); } } catch (Throwable th) { // NOSONAR must halt in case of any failure LOGGER.error("Resume active failed", th); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java index 6154faa890..65d1039aff 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java @@ -50,6 +50,7 @@ public class CancelQueryRequest implements ICcAddressedMessage { final IRequestTracker requestTracker = appCtx.getRequestTracker(); IClientRequest req = uuid != null ? requestTracker.get(uuid) : requestTracker.getByClientContextId(contextId); RequestStatus status; + String requestId = ""; if (req == null) { LOGGER.log(Level.INFO, "No request found for uuid {} or context id {}", uuid, contextId); @@ -59,7 +60,8 @@ public class CancelQueryRequest implements ICcAddressedMessage { status = RequestStatus.REJECTED; } else { try { - requestTracker.cancel(req.getId()); + requestId = req.getId(); + requestTracker.cancel(requestId); status = RequestStatus.SUCCESS; } catch (Exception e) { LOGGER.log(Level.WARN, "unexpected exception thrown from cancel", e); @@ -67,6 +69,10 @@ public class CancelQueryRequest implements ICcAddressedMessage { } } } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Sending CancelQueryResponse to {}. requestId:{}, uuid:{}, contextId:{}, status:{}", nodeId, + requestId, uuid, contextId, status); + } CancelQueryResponse response = new CancelQueryResponse(reqId, status); CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); try { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index 0a5e0330a8..9b722df12c 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -243,7 +243,7 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { @Override public String toString() { - return String.format("%s(id=%s, from=%s): %s", getClass().getSimpleName(), requestMessageId, requestNodeId, - LogRedactionUtil.statement(statementsText)); + return String.format("%s(id=%s, from=%s, uuid=%s): %s", getClass().getSimpleName(), requestMessageId, + requestNodeId, LogRedactionUtil.statement(statementsText), requestReference.getUuid()); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index e098a89365..afdd0359a4 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -4837,6 +4837,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // ensure request not cancelled before running job ensureNotCancelled(clientRequest); final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("createAndRunJob jobId:{}, uuid:{}", jobId, + requestParameters.getRequestReference().getUuid()); + } clientRequest.setJobId(jobId); if (jId != null) { jId.setValue(jobId); diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java index d74f50044c..75fbb923f3 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java @@ -25,6 +25,7 @@ import java.io.Serializable; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Objects; import org.apache.hyracks.api.io.IWritable; @@ -82,6 +83,10 @@ public final class NetworkAddress implements IWritable, Serializable { return inetSocketAddress; } + public InetSocketAddress toInetSocketAddress() { + return new InetSocketAddress(address, port); + } + public int getPort() { return port; } @@ -102,7 +107,7 @@ public final class NetworkAddress implements IWritable, Serializable { return false; } NetworkAddress on = (NetworkAddress) o; - return on.port == port && on.address == address; + return on.port == port && Objects.equals(on.address, address); } @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java index e3135df076..9a08e8e2d6 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java @@ -47,4 +47,9 @@ public class CancelJobWork extends SynchronizableWork { callback.setException(e); } } + + @Override + public String toString() { + return getName() + " jobId:" + jobId; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java index 727793b219..c3a09f9da3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java @@ -37,7 +37,7 @@ import org.apache.logging.log4j.Logger; public class JobletCleanupNotificationWork extends AbstractHeartbeatWork { private static final Logger LOGGER = LogManager.getLogger(); - private JobId jobId; + private final JobId jobId; public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId jobId, String nodeId) { super(ccs, nodeId, null); @@ -77,4 +77,9 @@ public class JobletCleanupNotificationWork extends AbstractHeartbeatWork { } } } + + @Override + public String toString() { + return getName() + " jobId:" + jobId + ", nodeId:" + nodeId; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index b1700febb5..ec21785baa 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -18,6 +18,7 @@ */ package org.apache.hyracks.control.cc.work; +import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; @@ -50,8 +51,9 @@ public class RegisterNodeWork extends SynchronizableWork { protected void doRun() throws Exception { String id = reg.getNodeId(); LOGGER.info("registering node: {}", id); - NodeControllerRemoteProxy nc = new NodeControllerRemoteProxy(ccs.getCcId(), - ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress().resolveInetSocketAddress())); + InetSocketAddress ncAddress = reg.getNodeControllerAddress().toInetSocketAddress(); + NodeControllerRemoteProxy nc = + new NodeControllerRemoteProxy(ccs.getCcId(), ccs.getClusterIPC().getReconnectingHandle(ncAddress)); INodeManager nodeManager = ccs.getNodeManager(); NodeParameters params = new NodeParameters(); params.setClusterControllerInfo(ccs.getClusterControllerInfo()); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java index 14d92fb6f5..ed3e574323 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java @@ -87,4 +87,9 @@ public class WaitForJobCompletionWork extends SynchronizableWork { }); } } + + @Override + public String toString() { + return getName() + " jobId:" + jobId; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java index 8c9cbfb6c4..f69d106f1d 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java @@ -426,7 +426,7 @@ public class NodeControllerService implements IControllerService { NodeParameters nodeParameters = ccc.getNodeParameters(); // Start heartbeat generator. heartbeatManagers.computeIfAbsent(ccId, newCcId -> HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(), - nodeRegistration.getNodeControllerAddress().resolveInetSocketAddress())); + nodeRegistration.getNodeControllerAddress().toInetSocketAddress())); if (!ccTimers.containsKey(ccId) && nodeParameters.getProfileDumpPeriod() > 0) { Timer ccTimer = new Timer("Timer-" + ccId, true); // Schedule profile dump generator. diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java index f47e1ce957..bfe370659f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java @@ -70,4 +70,9 @@ public class AbortTasksWork extends AbstractWork { "Joblet couldn't be found. Tasks of job " + jobId + " have all either completed or failed"); } } + + @Override + public String toString() { + return getName() + " jobId:" + jobId; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java index ae2cfa098e..75edd38b00 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java @@ -35,7 +35,7 @@ public class CleanupJobletWork extends AbstractWork { private final JobId jobId; - private JobStatus status; + private final JobStatus status; public CleanupJobletWork(NodeControllerService ncs, JobId jobId, JobStatus status) { this.ncs = ncs; @@ -54,4 +54,9 @@ public class CleanupJobletWork extends AbstractWork { joblet.cleanup(status); } } + + @Override + public String toString() { + return getName() + " jobId:" + jobId + ", status:" + status; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index f6c144db93..f277046496 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -315,4 +315,9 @@ public class StartTasksWork extends AbstractWork { } return channelsForInputConnectors; } + + @Override + public String toString() { + return getName() + " jobId:" + jobId; + } }
