This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 1e367fd1fc74e4614b9f7c1eeea1c3155c2a9615
Author: Ali Alsuliman <[email protected]>
AuthorDate: Tue Jul 4 21:30:31 2023 -0700

    [ASTERIXDB-3216][NET] Always resolve NC address on node registration
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    Always resolve NC address on node registration.
    
    - add some loggings
    
    Change-Id: I37477de0a932439e301d8ff6f88de1355d612736
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17628
    Reviewed-by: Michael Blow <[email protected]>
    Tested-by: Michael Blow <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
---
 .../http/server/NCQueryCancellationServlet.java    |  3 ++
 .../api/http/server/NCQueryServiceServlet.java     |  3 +-
 .../app/active/ActiveEntityEventsListener.java     | 10 ++---
 .../app/active/ActiveNotificationHandler.java      | 52 +++++++++++-----------
 .../asterix/app/message/CancelQueryRequest.java    |  8 +++-
 .../message/ExecuteStatementRequestMessage.java    |  4 +-
 .../asterix/app/translator/QueryTranslator.java    |  4 ++
 .../apache/hyracks/api/comm/NetworkAddress.java    |  7 ++-
 .../hyracks/control/cc/work/CancelJobWork.java     |  5 +++
 .../cc/work/JobletCleanupNotificationWork.java     |  7 ++-
 .../hyracks/control/cc/work/RegisterNodeWork.java  |  6 ++-
 .../control/cc/work/WaitForJobCompletionWork.java  |  5 +++
 .../hyracks/control/nc/NodeControllerService.java  |  2 +-
 .../hyracks/control/nc/work/AbortTasksWork.java    |  5 +++
 .../hyracks/control/nc/work/CleanupJobletWork.java |  7 ++-
 .../hyracks/control/nc/work/StartTasksWork.java    |  5 +++
 16 files changed, 91 insertions(+), 42 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
index 41f2a53dc2..b2134dcfc6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryCancellationServlet.java
@@ -67,6 +67,9 @@ public class NCQueryCancellationServlet extends 
AbstractServlet {
             CancelQueryRequest cancelQueryMessage = new 
CancelQueryRequest(serviceCtx.getNodeId(),
                     cancelQueryFuture.getFutureId(), uuid, clientContextId);
             // TODO(mblow): multicc -- need to send cancellation to the 
correct cc
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("Sending CancelQueryRequest with uuid:{}, 
clientContextID:{}", uuid, clientContextId);
+            }
             messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
             CancelQueryResponse cancelResponse =
                     (CancelQueryResponse) 
cancelQueryFuture.get(DEFAULT_NC_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
index 1ec74055d0..9fa479c398 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/NCQueryServiceServlet.java
@@ -165,7 +165,8 @@ public class NCQueryServiceServlet extends 
QueryServiceServlet {
             CancelQueryRequest cancelQueryMessage =
                     new CancelQueryRequest(nodeId, 
cancelQueryFuture.getFutureId(), uuid, clientContextID);
             // TODO(mblow): multicc -- need to send cancellation to the 
correct cc
-            LOGGER.info("Cancelling query due to {}", 
exception.getClass().getSimpleName());
+            LOGGER.info("Cancelling query with uuid:{}, clientContextID:{} due 
to {}", uuid, clientContextID,
+                    exception.getClass().getSimpleName());
             messageBroker.sendMessageToPrimaryCC(cancelQueryMessage);
             if (wait) {
                 
cancelQueryFuture.get(ExecuteStatementRequestMessage.DEFAULT_QUERY_CANCELLATION_WAIT_MILLIS,
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index ddd3d64682..3fd339e8df 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -195,12 +195,12 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "the job {} finished", jobId);
+            LOGGER.log(level, "Active job {} finished", jobId);
         }
         JobId lastJobId = jobId;
         if (numRegistered != numDeRegistered) {
             LOGGER.log(Level.WARN,
-                    "the job {} finished with reported runtime registrations = 
{} and deregistrations = {}", jobId,
+                    "Active job {} finished with reported runtime 
registrations = {} and deregistrations = {}", jobId,
                     numRegistered, numDeRegistered);
         }
         jobId = null;
@@ -208,7 +208,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
         JobStatus jobStatus = status.getLeft();
         List<Exception> exceptions = status.getRight();
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "The job finished with status: {}", jobStatus);
+            LOGGER.log(level, "Active job {} finished with status {}", 
lastJobId, jobStatus);
         }
         if (!jobSuccessfullyTerminated(jobStatus)) {
             jobFailure = exceptions.isEmpty() ? new 
RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
@@ -372,7 +372,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
     @Override
     public synchronized void recover() {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Recover is called on " + entityId);
+            LOGGER.log(level, "Recover is called on {}", entityId);
         }
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
             LOGGER.log(level, "But it has no recovery policy, so it is set to 
permanent failure");
@@ -456,7 +456,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
         try {
             metadataProvider.getApplicationContext().getHcc().cancelJob(jobId);
         } catch (Throwable th) {
-            LOGGER.warn("Failed to cancel active job", th);
+            LOGGER.warn("Failed to cancel active job {}", jobId, th);
             e.addSuppressed(th);
         }
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 284929f5ae..6b3581e66c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -66,22 +66,24 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
 
     @Override
     protected void handle(ActiveEvent event) {
-        EntityId entityId = jobId2EntityId.get(event.getJobId());
+        JobId jobId = event.getJobId();
+        Kind eventKind = event.getEventKind();
+        EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Next event is of type " + 
event.getEventKind());
+                LOGGER.log(level, "Next event is {} for job {}", eventKind, 
jobId);
             }
-            if (event.getEventKind() == Kind.JOB_FINISHED) {
-                LOGGER.log(level, "Removing the job");
-                jobId2EntityId.remove(event.getJobId());
+            if (eventKind == Kind.JOB_FINISHED) {
+                LOGGER.log(level, "Removing job {}", jobId);
+                jobId2EntityId.remove(jobId);
             }
             if (listener != null) {
                 LOGGER.log(level, "Notifying the listener");
                 listener.notify(event);
             }
         } else {
-            LOGGER.log(Level.ERROR, "Entity not found for received message for 
job " + event.getJobId());
+            LOGGER.log(Level.ERROR, "Entity not found for event {} for job 
{}", eventKind, jobId);
         }
     }
 
@@ -90,45 +92,43 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
     @Override
     public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) throws HyracksDataException {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level,
-                    "notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) was called with jobId = "
-                            + jobId);
+            LOGGER.log(level, "notifyJobCreation was called for job {}", 
jobId);
         }
         Object property = 
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (!(property instanceof EntityId)) {
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Job is not of type active job. property 
found to be: " + property);
+                LOGGER.log(level, "Job {} is not of type active job. property 
found to be {}", jobId, property);
             }
             return;
         }
         EntityId entityId = (EntityId) property;
         monitorJob(jobId, entityId);
         boolean found = jobId2EntityId.get(jobId) != null;
-        LOGGER.log(level, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
+        LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? 
"Active" : "Inactive"));
         add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, 
jobSpecification));
     }
 
     private synchronized void monitorJob(JobId jobId, EntityId entityId) {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "monitorJob(JobId jobId, ActiveJob activeJob) 
called with job id: " + jobId);
+            LOGGER.log(level, "monitorJob was called for job {}", jobId);
         }
         boolean found = jobId2EntityId.get(jobId) != null;
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Job was found to be: " + (found ? "Active" : 
"Inactive"));
+            LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? 
"Active" : "Inactive"));
         }
         if (entityEventListeners.containsKey(entityId)) {
             if (jobId2EntityId.containsKey(jobId)) {
                 if (LOGGER.isErrorEnabled()) {
-                    LOGGER.error("Job is already being monitored for job: " + 
jobId);
+                    LOGGER.error("Job {} is already being monitored", jobId);
                 }
                 return;
             }
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "monitoring started for job id: " + jobId);
+                LOGGER.log(level, "Monitoring started for job {}", jobId);
             }
         } else {
             if (LOGGER.isEnabled(level)) {
-                LOGGER.info("No listener was found for the entity: " + 
entityId);
+                LOGGER.info("No listener was found for the entity {} for job 
{}", entityId, jobId);
             }
         }
         jobId2EntityId.put(jobId, entityId);
@@ -146,14 +146,14 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
     public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions)
             throws HyracksException {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Getting notified of job finish for JobId: " + 
jobId);
+            LOGGER.log(level, "Getting notified of job finish for job {}", 
jobId);
         }
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, 
Pair.of(jobStatus, exceptions)));
         } else {
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "no need to notify job finish");
+                LOGGER.log(level, "No need to notify JOB_FINISHED for job {}", 
jobId);
             }
         }
     }
@@ -169,11 +169,11 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
     @Override
     public IActiveEntityEventsListener getListener(EntityId entityId) {
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "getActiveEntityListener(EntityId entityId) was 
called with entity " + entityId);
+            LOGGER.log(level, "getActiveEntityListener was called with entity 
{}", entityId);
         }
         IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Listener found: " + listener);
+            LOGGER.log(level, "Listener found: {}", listener);
         }
         return entityEventListeners.get(entityId);
     }
@@ -192,8 +192,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "registerListener(IActiveEntityEventsListener 
listener) was called for the entity "
-                    + listener.getEntityId());
+            LOGGER.log(level, "registerListener was called for the entity {}", 
listener.getEntityId());
         }
         if (entityEventListeners.containsKey(listener.getEntityId())) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, 
listener.getEntityId());
@@ -207,8 +206,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
         if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "unregisterListener(IActiveEntityEventsListener 
listener) was called for the entity "
-                    + listener.getEntityId());
+            LOGGER.log(level, "unregisterListener was called for the entity 
{}", listener.getEntityId());
         }
         IActiveEntityEventsListener registeredListener = 
entityEventListeners.remove(listener.getEntityId());
         if (registeredListener == null) {
@@ -226,7 +224,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         for (IActiveEntityEventsListener listener : getEventListeners()) {
             synchronized (listener) {
                 if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, "Entity " + listener.getEntityId() + " 
is " + listener.getState());
+                    LOGGER.log(level, "Entity {} is {}", 
listener.getEntityId(), listener.getState());
                 }
                 listener.notifyAll();
             }
@@ -276,11 +274,11 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
     public void resumeOrHalt(IActiveEntityEventsListener listener, 
MetadataProvider metadataProvider) {
         try {
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Resuming " + listener.getEntityId());
+                LOGGER.log(level, "Resuming {}", listener.getEntityId());
             }
             ((ActiveEntityEventsListener) listener).resume(metadataProvider);
             if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, listener.getEntityId() + " resumed");
+                LOGGER.log(level, "{} resumed", listener.getEntityId());
             }
         } catch (Throwable th) { // NOSONAR must halt in case of any failure
             LOGGER.error("Resume active failed", th);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 6154faa890..65d1039aff 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -50,6 +50,7 @@ public class CancelQueryRequest implements 
ICcAddressedMessage {
         final IRequestTracker requestTracker = appCtx.getRequestTracker();
         IClientRequest req = uuid != null ? requestTracker.get(uuid) : 
requestTracker.getByClientContextId(contextId);
         RequestStatus status;
+        String requestId = "";
 
         if (req == null) {
             LOGGER.log(Level.INFO, "No request found for uuid {} or context id 
{}", uuid, contextId);
@@ -59,7 +60,8 @@ public class CancelQueryRequest implements 
ICcAddressedMessage {
                 status = RequestStatus.REJECTED;
             } else {
                 try {
-                    requestTracker.cancel(req.getId());
+                    requestId = req.getId();
+                    requestTracker.cancel(requestId);
                     status = RequestStatus.SUCCESS;
                 } catch (Exception e) {
                     LOGGER.log(Level.WARN, "unexpected exception thrown from 
cancel", e);
@@ -67,6 +69,10 @@ public class CancelQueryRequest implements 
ICcAddressedMessage {
                 }
             }
         }
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("Sending CancelQueryResponse to {}. requestId:{}, 
uuid:{}, contextId:{}, status:{}", nodeId,
+                    requestId, uuid, contextId, status);
+        }
         CancelQueryResponse response = new CancelQueryResponse(reqId, status);
         CCMessageBroker messageBroker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
         try {
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
index 0a5e0330a8..9b722df12c 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java
@@ -243,7 +243,7 @@ public class ExecuteStatementRequestMessage implements 
ICcAddressedMessage {
 
     @Override
     public String toString() {
-        return String.format("%s(id=%s, from=%s): %s", 
getClass().getSimpleName(), requestMessageId, requestNodeId,
-                LogRedactionUtil.statement(statementsText));
+        return String.format("%s(id=%s, from=%s, uuid=%s): %s", 
getClass().getSimpleName(), requestMessageId,
+                requestNodeId, LogRedactionUtil.statement(statementsText), 
requestReference.getUuid());
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index e098a89365..afdd0359a4 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -4837,6 +4837,10 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             // ensure request not cancelled before running job
             ensureNotCancelled(clientRequest);
             final JobId jobId = JobUtils.runJob(hcc, jobSpec, jobFlags, false);
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("createAndRunJob jobId:{}, uuid:{}", jobId,
+                        requestParameters.getRequestReference().getUuid());
+            }
             clientRequest.setJobId(jobId);
             if (jId != null) {
                 jId.setValue(jobId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
index d74f50044c..75fbb923f3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/comm/NetworkAddress.java
@@ -25,6 +25,7 @@ import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Objects;
 
 import org.apache.hyracks.api.io.IWritable;
 
@@ -82,6 +83,10 @@ public final class NetworkAddress implements IWritable, 
Serializable {
         return inetSocketAddress;
     }
 
+    public InetSocketAddress toInetSocketAddress() {
+        return new InetSocketAddress(address, port);
+    }
+
     public int getPort() {
         return port;
     }
@@ -102,7 +107,7 @@ public final class NetworkAddress implements IWritable, 
Serializable {
             return false;
         }
         NetworkAddress on = (NetworkAddress) o;
-        return on.port == port && on.address == address;
+        return on.port == port && Objects.equals(on.address, address);
     }
 
     @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
index e3135df076..9a08e8e2d6 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/CancelJobWork.java
@@ -47,4 +47,9 @@ public class CancelJobWork extends SynchronizableWork {
             callback.setException(e);
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index 727793b219..c3a09f9da3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -37,7 +37,7 @@ import org.apache.logging.log4j.Logger;
 public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private JobId jobId;
+    private final JobId jobId;
 
     public JobletCleanupNotificationWork(ClusterControllerService ccs, JobId 
jobId, String nodeId) {
         super(ccs, nodeId, null);
@@ -77,4 +77,9 @@ public class JobletCleanupNotificationWork extends 
AbstractHeartbeatWork {
             }
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId + ", nodeId:" + nodeId;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index b1700febb5..ec21785baa 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hyracks.control.cc.work;
 
+import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -50,8 +51,9 @@ public class RegisterNodeWork extends SynchronizableWork {
     protected void doRun() throws Exception {
         String id = reg.getNodeId();
         LOGGER.info("registering node: {}", id);
-        NodeControllerRemoteProxy nc = new 
NodeControllerRemoteProxy(ccs.getCcId(),
-                
ccs.getClusterIPC().getReconnectingHandle(reg.getNodeControllerAddress().resolveInetSocketAddress()));
+        InetSocketAddress ncAddress = 
reg.getNodeControllerAddress().toInetSocketAddress();
+        NodeControllerRemoteProxy nc =
+                new NodeControllerRemoteProxy(ccs.getCcId(), 
ccs.getClusterIPC().getReconnectingHandle(ncAddress));
         INodeManager nodeManager = ccs.getNodeManager();
         NodeParameters params = new NodeParameters();
         params.setClusterControllerInfo(ccs.getClusterControllerInfo());
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index 14d92fb6f5..ed3e574323 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -87,4 +87,9 @@ public class WaitForJobCompletionWork extends 
SynchronizableWork {
             });
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
index 8c9cbfb6c4..f69d106f1d 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/NodeControllerService.java
@@ -426,7 +426,7 @@ public class NodeControllerService implements 
IControllerService {
         NodeParameters nodeParameters = ccc.getNodeParameters();
         // Start heartbeat generator.
         heartbeatManagers.computeIfAbsent(ccId, newCcId -> 
HeartbeatManager.init(this, ccc, hbTask.getHeartbeatData(),
-                
nodeRegistration.getNodeControllerAddress().resolveInetSocketAddress()));
+                
nodeRegistration.getNodeControllerAddress().toInetSocketAddress()));
         if (!ccTimers.containsKey(ccId) && 
nodeParameters.getProfileDumpPeriod() > 0) {
             Timer ccTimer = new Timer("Timer-" + ccId, true);
             // Schedule profile dump generator.
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
index f47e1ce957..bfe370659f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/AbortTasksWork.java
@@ -70,4 +70,9 @@ public class AbortTasksWork extends AbstractWork {
                     "Joblet couldn't be found. Tasks of job " + jobId + " have 
all either completed or failed");
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index ae2cfa098e..75edd38b00 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -35,7 +35,7 @@ public class CleanupJobletWork extends AbstractWork {
 
     private final JobId jobId;
 
-    private JobStatus status;
+    private final JobStatus status;
 
     public CleanupJobletWork(NodeControllerService ncs, JobId jobId, JobStatus 
status) {
         this.ncs = ncs;
@@ -54,4 +54,9 @@ public class CleanupJobletWork extends AbstractWork {
             joblet.cleanup(status);
         }
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId + ", status:" + status;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index f6c144db93..f277046496 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -315,4 +315,9 @@ public class StartTasksWork extends AbstractWork {
         }
         return channelsForInputConnectors;
     }
+
+    @Override
+    public String toString() {
+        return getName() + " jobId:" + jobId;
+    }
 }

Reply via email to