This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit f124592d2be9dfc37b871c2bcd34b30bc5980902
Author: Ali Alsuliman <[email protected]>
AuthorDate: Sat Jun 8 03:05:28 2024 +0300

    [NO ISSUE][OTH] Logging enhancements + query job logging
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    - Reduce some JobWork logging level to TRACE.
    - Remove not useful logs.
    - Add context to cancel request/response messages.
    - Avoid unnecessary NPEs when closing the pipeline causing
      not needed stack traces in logs.
    - Add method to get Job Queue size.
    - Add method to IJobCapacityController to get cluster
      current capacity for logging.
    
    This patch includes backports from:
    https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18318
    https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18016
    https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18237
    
    Change-Id: I0389e693493d99a12483b94c50eeb1697f69515f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18353
    Reviewed-by: Murtadha Hubail <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../org/apache/asterix/active/ActiveManager.java   |  3 +-
 .../app/active/ActiveEntityEventsListener.java     | 67 +++++++--------------
 .../app/active/ActiveNotificationHandler.java      | 70 ++++++----------------
 .../asterix/app/message/CancelQueryRequest.java    | 11 ++--
 .../asterix/app/message/CancelQueryResponse.java   |  4 ++
 .../evaluators/functions/SleepDescriptor.java      |  8 +--
 .../job/resource/JobCapacityController.java        |  5 ++
 ...stractOneInputOneOutputOneFramePushRuntime.java |  2 +-
 .../job/resource/DefaultJobCapacityController.java | 10 ++++
 .../api/job/resource/IJobCapacityController.java   |  6 ++
 .../hyracks/control/cc/executor/JobExecutor.java   | 11 ++--
 .../apache/hyracks/control/cc/job/JobManager.java  | 27 ++++++++-
 .../control/cc/partitions/PartitionMatchMaker.java |  8 +--
 .../control/cc/result/ResultDirectoryService.java  | 25 ++++----
 .../hyracks/control/cc/scheduler/FIFOJobQueue.java |  5 ++
 .../hyracks/control/cc/scheduler/IJobQueue.java    |  7 +++
 .../control/cc/work/ApplicationMessageWork.java    | 10 +++-
 .../cc/work/GetNodeControllersInfoWork.java        |  9 ++-
 .../cc/work/GetResultDirectoryAddressWork.java     |  7 +++
 .../cc/work/GetResultPartitionLocationsWork.java   |  6 ++
 .../hyracks/control/cc/work/JobCleanupWork.java    | 23 ++++---
 .../hyracks/control/cc/work/JobStartWork.java      |  6 ++
 .../cc/work/JobletCleanupNotificationWork.java     |  7 +++
 .../hyracks/control/cc/work/RegisterNodeWork.java  |  6 ++
 .../control/cc/work/RemoveDeadNodesWork.java       |  6 +-
 .../hyracks/control/cc/work/TaskCompleteWork.java  | 11 ++++
 .../hyracks/control/cc/work/TaskFailureWork.java   | 13 ++--
 .../control/cc/work/WaitForJobCompletionWork.java  |  6 ++
 .../MaterializingPipelinedPartition.java           |  7 ---
 .../control/nc/work/ApplicationMessageWork.java    | 14 +++--
 .../hyracks/control/nc/work/CleanupJobletWork.java |  9 ++-
 .../control/nc/work/NotifyTaskCompleteWork.java    | 18 ++++--
 .../control/nc/work/NotifyTaskFailureWork.java     |  9 ++-
 .../hyracks/control/nc/work/StartTasksWork.java    |  2 +-
 .../OptimizedHybridHashJoinOperatorDescriptor.java | 28 ++-------
 .../dataflow/std/sort/TupleSorterHeapSort.java     |  9 ---
 .../AbstractMultiNCIntegrationTest.java            | 10 ++++
 .../storage/am/lsm/common/util/ComponentUtils.java | 51 +++-------------
 38 files changed, 283 insertions(+), 253 deletions(-)

diff --git 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
index b99e4f2aec..ed481bc666 100644
--- 
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
+++ 
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java
@@ -104,9 +104,9 @@ public class ActiveManager {
     }
 
     public void submit(ActiveManagerMessage message) throws 
HyracksDataException {
-        LOGGER.debug("Message of type {} received in {}", message.getKind(), 
nodeId);
         switch (message.getKind()) {
             case STOP_ACTIVITY:
+                LOGGER.debug("Message of type {} received in {}", 
message.getKind(), nodeId);
                 stopRuntime(message);
                 break;
             case REQUEST_STATS:
@@ -151,7 +151,6 @@ public class ActiveManager {
                 return;
             }
             String stats = runtime.getStats();
-            LOGGER.debug("Sending stats response for {} ", runtimeId);
             ActiveStatsResponse response = new ActiveStatsResponse(reqId, 
stats, null);
             ((NodeControllerService) 
serviceCtx.getControllerService()).sendRealTimeApplicationMessageToCC(
                     message.getCcId(), 
JavaSerializationUtils.serialize(response), null);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
index 626b93828e..7b253f1544 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java
@@ -138,7 +138,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
     }
 
     protected synchronized void setState(ActivityState newState) {
-        LOGGER.log(level, "State of {} is being set to {} from {}", 
getEntityId(), newState, state);
+        LOGGER.log(level, "state of {} is being set from {} to {}", 
getEntityId(), state, newState);
         this.prevState = state;
         this.state = newState;
         if (newState == ActivityState.STARTING || newState == 
ActivityState.RECOVERING
@@ -153,9 +153,8 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
     @Override
     public synchronized void notify(ActiveEvent event) {
         try {
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "EventListener is notified.");
-            }
+            LOGGER.debug("CC handling event {}; state={}, prev state={}, 
suspended={}", event, state, prevState,
+                    suspended);
             ActiveEvent.Kind eventKind = event.getEventKind();
             switch (eventKind) {
                 case JOB_CREATED:
@@ -194,26 +193,21 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
 
     @SuppressWarnings("unchecked")
     protected void finish(ActiveEvent event) throws HyracksDataException {
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Active job {} finished", jobId);
-        }
         JobId lastJobId = jobId;
+        Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, 
List<Exception>>) event.getEventObject();
         if (numRegistered != numDeRegistered) {
             LOGGER.log(Level.WARN,
-                    "Active job {} finished with reported runtime 
registrations = {} and deregistrations = {}", jobId,
-                    numRegistered, numDeRegistered);
+                    "ingestion job {} finished with status={}, reported 
runtime registrations={}, deregistrations={}",
+                    jobId, status, numRegistered, numDeRegistered);
         }
         jobId = null;
-        Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, 
List<Exception>>) event.getEventObject();
         JobStatus jobStatus = status.getLeft();
         List<Exception> exceptions = status.getRight();
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Active job {} finished with status {}", 
lastJobId, jobStatus);
-        }
+        LOGGER.debug("ingestion job {} finished with status {}", lastJobId, 
jobStatus);
         if (!jobSuccessfullyTerminated(jobStatus)) {
             jobFailure = exceptions.isEmpty() ? new 
RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION)
                     : exceptions.get(0);
-            LOGGER.error("Active Job {} failed", lastJobId, jobFailure);
+            LOGGER.error("ingestion job {} failed", lastJobId, jobFailure);
             setState((state == ActivityState.STOPPING || state == 
ActivityState.CANCELLING) ? ActivityState.STOPPED
                     : ActivityState.TEMPORARILY_FAILED);
             if (prevState == ActivityState.RUNNING) {
@@ -371,16 +365,14 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
 
     @Override
     public synchronized void recover() {
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Recover is called on {}", entityId);
-        }
         if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) {
-            LOGGER.log(level, "But it has no recovery policy, so it is set to 
permanent failure");
+            LOGGER.debug("recover is called on {} w/o recovery policy; setting 
to permanent failure", entityId);
             setState(ActivityState.STOPPED);
         } else {
+            LOGGER.debug("recover is called on {}", entityId);
             ExecutorService executor = 
appCtx.getServiceContext().getControllerService().getExecutor();
             setState(ActivityState.TEMPORARILY_FAILED);
-            LOGGER.log(level, "Recovery task has been submitted");
+            LOGGER.debug("recovery task has been submitted");
             rt = createRecoveryTask();
             executor.submit(rt.recover());
         }
@@ -479,15 +471,11 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
         // Note: once we start sending stop messages, we can't go back until 
the entity is stopped
         final String nameBefore = Thread.currentThread().getName();
         try {
-            Thread.currentThread().setName(nameBefore + " : 
WaitForCompletionForJobId: " + jobId);
+            Thread.currentThread().setName(nameBefore + " : 
wait-for-ingestion-completion: " + jobId);
             sendStopMessages(metadataProvider, timeout, unit);
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Waiting for its state to become " + waitFor);
-            }
+            LOGGER.debug("waiting for {} to become {}", jobId, waitFor);
             subscriber.sync();
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Disconnect has been completed " + waitFor);
-            }
+            LOGGER.debug("disconnect has been completed {}", waitFor);
         } catch (InterruptedException ie) {
             forceStop(subscriber, ie);
             Thread.currentThread().interrupt();
@@ -513,13 +501,8 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
         ICCMessageBroker messageBroker = (ICCMessageBroker) 
applicationCtx.getServiceContext().getMessageBroker();
         AlgebricksAbsolutePartitionConstraint runtimeLocations = 
getLocations();
         int partition = 0;
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.log(Level.INFO, "Sending stop messages to " + 
runtimeLocations);
-        }
+        LOGGER.log(Level.INFO, "sending stop messages to {}", 
runtimeLocations);
         for (String location : runtimeLocations.getLocations()) {
-            if (LOGGER.isInfoEnabled()) {
-                LOGGER.log(Level.INFO, "Sending to " + location);
-            }
             ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++);
             messageBroker.sendApplicationMessageToNC(new 
ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY,
                     runtimeId, new StopRuntimeParameters(timeout, unit)), 
location);
@@ -581,14 +564,10 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
         WaitForStateSubscriber subscriber;
         Future<Void> suspendTask;
         synchronized (this) {
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "suspending entity " + entityId);
-                LOGGER.log(level, "Waiting for ongoing activities");
-            }
+            LOGGER.log(level, "{} suspending entity {}", jobId, entityId);
+            LOGGER.log(level, "{} waiting for ongoing activities", jobId);
             waitForNonTransitionState();
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Proceeding with suspension. Current state 
is " + state);
-            }
+            LOGGER.log(level, "{} proceeding with suspension. current state is 
{}", jobId, state);
             if (state == ActivityState.STOPPED) {
                 suspended = true;
                 return;
@@ -609,12 +588,12 @@ public abstract class ActiveEntityEventsListener 
implements IActiveEntityControl
                         doSuspend(metadataProvider);
                         return null;
                     });
-            LOGGER.log(level, "Suspension task has been submitted");
+            LOGGER.log(level, "{} suspension task has been submitted", jobId);
         }
         try {
-            LOGGER.log(level, "Waiting for suspension task to complete");
+            LOGGER.log(level, "{} waiting for suspension task to complete", 
jobId);
             suspendTask.get();
-            LOGGER.log(level, "waiting for state to become SUSPENDED or 
TEMPORARILY_FAILED");
+            LOGGER.log(level, "{} waiting for state to become SUSPENDED or 
TEMPORARILY_FAILED", jobId);
             subscriber.sync();
             suspended = true;
         } catch (Exception e) {
@@ -736,7 +715,7 @@ public abstract class ActiveEntityEventsListener implements 
IActiveEntityControl
 
     @Override
     public String toString() {
-        return "{\"class\":\"" + getClass().getSimpleName() + "\"," + 
"\"entityId\":\"" + entityId + "\","
-                + "\"state\":\"" + state + "\"" + "}";
+        return "{\"class\":\"" + getClass().getSimpleName() + "\", 
\"entityId\":\"" + entityId + "\", \"state\":\""
+                + state + "\", \"prev state\":\"" + prevState + "\", 
\"suspended\":" + suspended + "}";
     }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index 3c277d5aa6..662884d48b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -72,16 +72,14 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
             IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Next event is {} for job {}", eventKind, 
jobId);
-            }
             if (eventKind == Kind.JOB_FINISHED) {
-                LOGGER.log(level, "Removing job {}", jobId);
+                LOGGER.debug("removing ingestion job {}", jobId);
                 jobId2EntityId.remove(jobId);
             }
             if (listener != null) {
-                LOGGER.log(level, "Notifying the listener");
                 listener.notify(event);
+            } else {
+                LOGGER.debug("listener not found for entity {} on event={}", 
entityId, event);
             }
         } else {
             LOGGER.log(Level.ERROR, "Entity not found for event {} for job 
{}", eventKind, jobId);
@@ -92,45 +90,29 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
 
     @Override
     public void notifyJobCreation(JobId jobId, JobSpecification 
jobSpecification) throws HyracksDataException {
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "notifyJobCreation was called for job {}", 
jobId);
-        }
         Object property = 
jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME);
         if (!(property instanceof EntityId)) {
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Job {} is not of type active job. property 
found to be {}", jobId, property);
+            if (property != null) {
+                LOGGER.debug("{} is not an ingestion job. job property={}", 
jobId, property);
             }
             return;
         }
+        LOGGER.debug("notified of ingestion job creation {}", jobId);
         EntityId entityId = (EntityId) property;
         monitorJob(jobId, entityId);
-        boolean found = jobId2EntityId.get(jobId) != null;
-        LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? 
"Active" : "Inactive"));
         add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, 
jobSpecification));
     }
 
     private synchronized void monitorJob(JobId jobId, EntityId entityId) {
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "monitorJob was called for job {}", jobId);
-        }
-        boolean found = jobId2EntityId.get(jobId) != null;
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? 
"Active" : "Inactive"));
-        }
+        boolean found = jobId2EntityId.containsKey(jobId);
+        LOGGER.debug("{} was {}", jobId, (found ? "active" : "inactive"));
         if (entityEventListeners.containsKey(entityId)) {
-            if (jobId2EntityId.containsKey(jobId)) {
-                if (LOGGER.isErrorEnabled()) {
-                    LOGGER.error("Job {} is already being monitored", jobId);
-                }
+            if (found) {
+                LOGGER.error("{} is already being monitored", jobId);
                 return;
             }
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "Monitoring started for job {}", jobId);
-            }
         } else {
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.info("No listener was found for the entity {} for job 
{}", entityId, jobId);
-            }
+            LOGGER.debug("no listener found for entity {}; {}", entityId, 
jobId);
         }
         jobId2EntityId.put(jobId, entityId);
     }
@@ -141,22 +123,18 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         if (entityId != null) {
             add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null));
         }
+        // else must be non-active job, e.g. a job for a query
     }
 
     @Override
     public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, 
List<Exception> exceptions)
             throws HyracksException {
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Getting notified of job finish for job {}", 
jobId);
-        }
         EntityId entityId = jobId2EntityId.get(jobId);
         if (entityId != null) {
+            LOGGER.debug("notified of ingestion job finish {}", jobId);
             add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, 
Pair.of(jobStatus, exceptions)));
-        } else {
-            if (LOGGER.isEnabled(level)) {
-                LOGGER.log(level, "No need to notify JOB_FINISHED for job {}", 
jobId);
-            }
         }
+        // else must be non-active job, e.g. a job for a query
     }
 
     // *** IActiveNotificationHandler
@@ -169,13 +147,6 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
 
     @Override
     public IActiveEntityEventsListener getListener(EntityId entityId) {
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "getActiveEntityListener was called with entity 
{}", entityId);
-        }
-        IActiveEntityEventsListener listener = 
entityEventListeners.get(entityId);
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "Listener found: {}", listener);
-        }
         return entityEventListeners.get(entityId);
     }
 
@@ -197,9 +168,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         if (suspended) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "registerListener was called for the entity {}", 
listener.getEntityId());
-        }
+        LOGGER.debug("register listener for entity {}, state={}", 
listener.getEntityId(), listener.getState());
         if (entityEventListeners.containsKey(listener.getEntityId())) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, 
listener.getEntityId());
         }
@@ -211,9 +180,7 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         if (suspended) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED);
         }
-        if (LOGGER.isEnabled(level)) {
-            LOGGER.log(level, "unregisterListener was called for the entity 
{}", listener.getEntityId());
-        }
+        LOGGER.debug("unregister listener for entity {}, state={}", 
listener.getEntityId(), listener.getState());
         IActiveEntityEventsListener registeredListener = 
entityEventListeners.remove(listener.getEntityId());
         if (registeredListener == null) {
             throw new 
RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, 
listener.getEntityId());
@@ -229,9 +196,8 @@ public class ActiveNotificationHandler extends 
SingleThreadEventProcessor<Active
         LOGGER.info("Starting active recovery");
         for (IActiveEntityEventsListener listener : getEventListeners()) {
             synchronized (listener) {
-                if (LOGGER.isEnabled(level)) {
-                    LOGGER.log(level, "Entity {} is {}", 
listener.getEntityId(), listener.getState());
-                }
+                LOGGER.debug("entity {} is {}, active={}, suspended={}", 
listener.getEntityId(), listener.getState(),
+                        listener.isActive(), listener.isSuspended());
                 listener.notifyAll();
             }
         }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
index 65d1039aff..65700413b1 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java
@@ -69,10 +69,8 @@ public class CancelQueryRequest implements 
ICcAddressedMessage {
                 }
             }
         }
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Sending CancelQueryResponse to {}. requestId:{}, 
uuid:{}, contextId:{}, status:{}", nodeId,
-                    requestId, uuid, contextId, status);
-        }
+        LOGGER.debug("sending CancelQueryResponse to {}. reqId:{}, uuid:{}, 
contextId:{}, status:{}", nodeId, requestId,
+                uuid, contextId, status);
         CancelQueryResponse response = new CancelQueryResponse(reqId, status);
         CCMessageBroker messageBroker = (CCMessageBroker) 
appCtx.getServiceContext().getMessageBroker();
         try {
@@ -82,4 +80,9 @@ public class CancelQueryRequest implements 
ICcAddressedMessage {
         }
     }
 
+    @Override
+    public String toString() {
+        return "CancelQueryRequest{from='" + nodeId + "', reqId=" + reqId + ", 
uuid='" + uuid + "', contextId='"
+                + contextId + "'}";
+    }
 }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
index d65ae316ee..68d34303eb 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java
@@ -49,4 +49,8 @@ public class CancelQueryResponse implements 
INcAddressedMessage {
         return status;
     }
 
+    @Override
+    public String toString() {
+        return "CancelQueryResponse{reqId=" + reqId + ", status=" + status + 
'}';
+    }
 }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
index ac87f7e264..ef348fe0a7 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java
@@ -64,16 +64,16 @@ public class SleepDescriptor extends 
AbstractScalarFunctionDynamicDescriptor {
                         final long time = 
ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset);
 
                         try {
-                            if (LOGGER.isInfoEnabled()) {
-                                LOGGER.log(Level.INFO,
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.log(Level.TRACE,
                                         
ctx.getTaskContext().getTaskAttemptId() + " sleeping for " + time + " ms");
                             }
                             Thread.sleep(time);
                         } catch (InterruptedException e) {
                             Thread.currentThread().interrupt();
                         } finally {
-                            if (LOGGER.isInfoEnabled()) {
-                                LOGGER.log(Level.INFO,
+                            if (LOGGER.isTraceEnabled()) {
+                                LOGGER.log(Level.TRACE,
                                         
ctx.getTaskContext().getTaskAttemptId() + " done sleeping for " + time + " ms");
                             }
                         }
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
index b123a5ed2e..ae903d1f71 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java
@@ -77,6 +77,11 @@ public class JobCapacityController implements 
IJobCapacityController {
         ensureMaxCapacity();
     }
 
+    @Override
+    public IReadOnlyClusterCapacity getClusterCapacity() {
+        return resourceManager.getCurrentCapacity();
+    }
+
     private void ensureMaxCapacity() {
         final IClusterCapacity currentCapacity = 
resourceManager.getCurrentCapacity();
         final IReadOnlyClusterCapacity maximumCapacity = 
resourceManager.getMaximumCapacity();
diff --git 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
index 9f4541f582..0c74260d6e 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java
@@ -74,7 +74,7 @@ public abstract class 
AbstractOneInputOneOutputOneFramePushRuntime extends Abstr
     }
 
     protected void flushIfNotFailed() throws HyracksDataException {
-        if (!failed && appender.getTupleCount() > 0) {
+        if (!failed && appender != null && appender.getTupleCount() > 0) {
             flushAndReset();
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
index 9e38a20991..b18bcb10ee 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java
@@ -24,6 +24,11 @@ import org.apache.hyracks.api.job.JobSpecification;
 public class DefaultJobCapacityController implements IJobCapacityController {
 
     public static final DefaultJobCapacityController INSTANCE = new 
DefaultJobCapacityController();
+    private static final IClusterCapacity CAPACITY = new ClusterCapacity();
+    static {
+        CAPACITY.setAggregatedCores(Integer.MAX_VALUE);
+        CAPACITY.setAggregatedMemoryByteSize(Long.MAX_VALUE);
+    }
 
     private DefaultJobCapacityController() {
     }
@@ -37,4 +42,9 @@ public class DefaultJobCapacityController implements 
IJobCapacityController {
     public void release(JobSpecification job) {
         // No operation here.
     }
+
+    @Override
+    public IReadOnlyClusterCapacity getClusterCapacity() {
+        return CAPACITY;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
index 5fa4bd9d83..f88baa2ee9 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java
@@ -57,4 +57,10 @@ public interface IJobCapacityController {
      */
     void release(JobSpecification job);
 
+    /**
+     * The cluster current capacity.
+     *
+     * @return the cluster current capacity.
+     */
+    IReadOnlyClusterCapacity getClusterCapacity();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3574acda5f..98918508f4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -539,7 +539,7 @@ public class JobExecutor {
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt,
             TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
-        LOGGER.trace(() -> "Aborting task cluster: " + tcAttempt.getAttempt());
+        LOGGER.trace("Aborting task cluster: {}", tcAttempt.getAttempt());
         Set<TaskAttemptId> abortTaskIds = new HashSet<>();
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>();
         for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) {
@@ -561,14 +561,12 @@ public class JobExecutor {
             }
         }
         final JobId jobId = jobRun.getJobId();
-        LOGGER.trace(() -> "Abort map for job: " + jobId + ": " + 
abortTaskAttemptMap);
+        LOGGER.trace("Abort map for job: {}: {}", jobId, abortTaskAttemptMap);
         INodeManager nodeManager = ccs.getNodeManager();
         abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> {
             final NodeControllerState node = 
nodeManager.getNodeControllerState(key);
             if (node != null) {
-                if (LOGGER.isTraceEnabled()) {
-                    LOGGER.trace("Aborting: " + abortTaskAttempts + " at " + 
key);
-                }
+                LOGGER.trace("Aborting: {} at {}", abortTaskAttempts, key);
                 try {
                     node.getNodeController().abortTasks(jobId, 
abortTaskAttempts);
                 } catch (Exception e) {
@@ -683,7 +681,6 @@ public class JobExecutor {
      */
     public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
         try {
-            LOGGER.debug("Received failure notification for TaskAttempt " + 
ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
             TaskCluster tc = ta.getTask().getTaskCluster();
             TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
@@ -696,7 +693,7 @@ public class JobExecutor {
                 LOGGER.trace(() -> "Marking TaskAttempt " + 
ta.getTaskAttemptId()
                         + " as failed and the number of max re-attempts = " + 
maxReattempts);
                 if (lastAttempt.getAttempt() >= maxReattempts || 
isCancelled()) {
-                    LOGGER.debug(() -> "Aborting the job of " + 
ta.getTaskAttemptId());
+                    LOGGER.debug("Aborting the job:{} of {}", 
jobRun.getJobId(), ta.getTaskAttemptId());
                     abortJob(exceptions, NoOpCallback.INSTANCE);
                     return;
                 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4ac49..ad971887f7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -38,7 +38,9 @@ import org.apache.hyracks.api.job.ActivityClusterGraph;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.IClusterCapacity;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.NodeControllerState;
@@ -129,7 +131,7 @@ public class JobManager implements IJobManager {
         if (activeRunMap.containsKey(jobId)) {
             JobRun jobRun = activeRunMap.get(jobId);
             // The following call will abort all ongoing tasks and then 
consequently
-            // trigger JobCleanupWork and JobCleanupNotificationWork which 
will update the lifecyle of the job.
+            // trigger JobCleanupWork and JobCleanupNotificationWork which 
will update the lifecycle of the job.
             // Therefore, we do not remove the job out of activeRunMap here.
             jobRun.getExecutor().cancelJob(callback);
             return;
@@ -139,7 +141,7 @@ public class JobManager implements IJobManager {
         if (jobRun != null) {
             List<Exception> exceptions =
                     
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, 
jobId));
-            // Since the job has not been executed, we only need to update its 
status and lifecyle here.
+            // Since the job has not been executed, we only need to update its 
status and lifecycle here.
             jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
             runMapArchive.put(jobId, jobRun);
             runMapHistory.put(jobId, exceptions);
@@ -170,7 +172,6 @@ public class JobManager implements IJobManager {
             return;
         }
         if (run.getPendingStatus() != null) {
-            LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", 
run::getJobId);
             return;
         }
         Set<String> targetNodes = run.getParticipatingNodeIds();
@@ -313,6 +314,7 @@ public class JobManager implements IJobManager {
         run.setStartTime(System.currentTimeMillis());
         run.setStartTimeZoneId(ZoneId.systemDefault().getId());
         JobId jobId = run.getJobId();
+        logJobCapacity(run, "running", Level.DEBUG);
         activeRunMap.put(jobId, run);
         run.setStatus(JobStatus.RUNNING, null);
         executeJobInternal(run);
@@ -320,6 +322,7 @@ public class JobManager implements IJobManager {
 
     // Queue a job when the required capacity for the job is not met.
     private void queueJob(JobRun jobRun) throws HyracksException {
+        logJobCapacity(jobRun, "queueing", Level.INFO);
         jobRun.setStatus(JobStatus.PENDING, null);
         jobQueue.add(jobRun);
     }
@@ -355,5 +358,23 @@ public class JobManager implements IJobManager {
     private void releaseJobCapacity(JobRun jobRun) {
         final JobSpecification job = jobRun.getJobSpecification();
         jobCapacityController.release(job);
+        logJobCapacity(jobRun, "released", Level.DEBUG);
+    }
+
+    private void logJobCapacity(JobRun jobRun, String jobStateDesc, Level lvl) 
{
+        IClusterCapacity requiredResources = 
jobRun.getJobSpecification().getRequiredClusterCapacity();
+        if (requiredResources == null) {
+            return;
+        }
+        long requiredMemory = requiredResources.getAggregatedMemoryByteSize();
+        int requiredCPUs = requiredResources.getAggregatedCores();
+        if (requiredMemory == 0 && requiredCPUs == 0) {
+            return;
+        }
+        IReadOnlyClusterCapacity clusterCapacity = 
jobCapacityController.getClusterCapacity();
+        LOGGER.log(lvl, "{} {}, memory={}, cpu={}, (new) cluster memory={}, 
cpu={}, currently running={}, queued={}",
+                jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs,
+                clusterCapacity.getAggregatedMemoryByteSize(), 
clusterCapacity.getAggregatedCores(),
+                getRunningJobsCount(), jobQueue.size());
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index ac29b530d4..b5df59339b 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -174,8 +174,8 @@ public class PartitionMatchMaker {
     }
 
     public void removeUncommittedPartitions(Set<PartitionId> partitionIds, 
final Set<TaskAttemptId> taIds) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Removing uncommitted partitions: " + partitionIds);
+        if (partitionIds != null && !partitionIds.isEmpty()) {
+            LOGGER.debug("Removing uncommitted partitions {}", partitionIds);
         }
         IEntryFilter<PartitionDescriptor> filter = new 
IEntryFilter<PartitionDescriptor>() {
             @Override
@@ -195,8 +195,8 @@ public class PartitionMatchMaker {
     }
 
     public void removePartitionRequests(Set<PartitionId> partitionIds, final 
Set<TaskAttemptId> taIds) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Removing partition requests: " + partitionIds);
+        if (partitionIds != null && !partitionIds.isEmpty()) {
+            LOGGER.debug("Removing partition requests {}", partitionIds);
         }
         IEntryFilter<PartitionRequest> filter = new 
IEntryFilter<PartitionRequest>() {
             @Override
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 9f8a7e24ea..46dd3512af 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -45,7 +45,6 @@ import org.apache.hyracks.api.result.ResultSetMetaData;
 import org.apache.hyracks.control.common.result.AbstractResultManager;
 import org.apache.hyracks.control.common.result.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -79,9 +78,7 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
 
     @Override
     public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec) throws HyracksException {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(getClass().getSimpleName() + " notified of new job " 
+ jobId);
-        }
+        LOGGER.debug("{} notified of new job {}", getClass().getSimpleName(), 
jobId);
         if (jobResultLocations.get(jobId) != null) {
             throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, 
jobId);
         }
@@ -157,15 +154,14 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> 
exceptions) {
-        Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
-        Level logLevel = Level.DEBUG;
-        if (LOGGER.isEnabled(logLevel)) {
-            LOGGER.log(logLevel, "job " + jobId + " failed and is being 
reported to " + getClass().getSimpleName(), ex);
-        }
         ResultJobRecord rjr = getResultJobRecord(jobId);
+        if (logFailure(rjr)) {
+            LOGGER.debug("job {} failed and is being reported to {}", jobId, 
getClass().getSimpleName());
+        }
         if (rjr != null) {
             rjr.fail(exceptions);
         }
+        Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
         final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
         if (jobResultInfo != null) {
             jobResultInfo.setException(ex);
@@ -211,6 +207,15 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
         }
     }
 
+    private static boolean logFailure(ResultJobRecord rjr) {
+        if (rjr == null) {
+            return true;
+        }
+        // don't re-log if the state is already failed
+        ResultJobRecord.Status status = rjr.getStatus();
+        return status == null || status.getState() != State.FAILED;
+    }
+
     /**
      * Compares the records already known by the client for the given job's 
result set id with the records that the
      * result directory service knows and if there are any newly discovered 
records returns a whole array with the
@@ -264,7 +269,7 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
 
 class JobResultInfo {
 
-    private ResultJobRecord record;
+    private final ResultJobRecord record;
     private Waiters waiters;
     private Exception exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
index 260c6b94eb..38277c2af7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java
@@ -121,4 +121,9 @@ public class FIFOJobQueue implements IJobQueue {
     public void clear() {
         jobListMap.clear();
     }
+
+    @Override
+    public int size() {
+        return jobListMap.size();
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
index be40883f60..1f2c29a3cb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java
@@ -73,4 +73,11 @@ public interface IJobQueue {
      * Clears the job queue
      */
     void clear();
+
+    /**
+     * Returns the number of queued jobs.
+     *
+     * @return the number of queued jobs.
+     */
+    int size();
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
index 771832eeea..6630ba72b4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java
@@ -25,6 +25,7 @@ import org.apache.hyracks.api.deployment.DeploymentId;
 import org.apache.hyracks.api.messages.IMessage;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -34,8 +35,8 @@ import org.apache.logging.log4j.Logger;
 public class ApplicationMessageWork extends AbstractHeartbeatWork {
 
     private static final Logger LOGGER = LogManager.getLogger();
-    private byte[] message;
-    private DeploymentId deploymentId;
+    private final byte[] message;
+    private final DeploymentId deploymentId;
 
     public ApplicationMessageWork(ClusterControllerService ccs, byte[] 
message, DeploymentId deploymentId,
             String nodeId) {
@@ -56,6 +57,11 @@ public class ApplicationMessageWork extends 
AbstractHeartbeatWork {
         }
     }
 
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
+
     @Override
     public String toString() {
         return getName() + ": nodeID: " + nodeId;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
index c36b887c28..f08e209444 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java
@@ -24,10 +24,12 @@ import org.apache.hyracks.api.client.NodeControllerInfo;
 import org.apache.hyracks.control.cc.cluster.INodeManager;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.logging.log4j.Level;
 
 public class GetNodeControllersInfoWork extends AbstractWork {
+
     private final INodeManager nodeManager;
-    private IResultCallback<Map<String, NodeControllerInfo>> callback;
+    private final IResultCallback<Map<String, NodeControllerInfo>> callback;
 
     public GetNodeControllersInfoWork(INodeManager nodeManager,
             IResultCallback<Map<String, NodeControllerInfo>> callback) {
@@ -39,4 +41,9 @@ public class GetNodeControllersInfoWork extends AbstractWork {
     public void run() {
         callback.setValue(nodeManager.getNodeControllerInfoMap());
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
index bf95ff236b..b7dbd7546e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java
@@ -22,8 +22,10 @@ import org.apache.hyracks.api.comm.NetworkAddress;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
 
 public class GetResultDirectoryAddressWork extends SynchronizableWork {
+
     private final ClusterControllerService ccs;
 
     private final IResultCallback<NetworkAddress> callback;
@@ -42,4 +44,9 @@ public class GetResultDirectoryAddressWork extends 
SynchronizableWork {
             callback.setException(e);
         }
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
index d1d22697f9..1e34b96c04 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java
@@ -28,6 +28,7 @@ import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.result.IResultDirectoryService;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
 
 public class GetResultPartitionLocationsWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
@@ -68,4 +69,9 @@ public class GetResultPartitionLocationsWork extends 
SynchronizableWork {
     public String toString() {
         return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " 
Known@" + Arrays.toString(knownRecords);
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 77d2f82f9c..6fe9909f05 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -28,17 +28,19 @@ import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.common.work.IResultCallback;
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 public class JobCleanupWork extends AbstractWork {
+
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private IJobManager jobManager;
-    private JobId jobId;
-    private JobStatus status;
-    private List<Exception> exceptions;
-    private IResultCallback<Void> callback;
+    private final IJobManager jobManager;
+    private final JobId jobId;
+    private final JobStatus status;
+    private final List<Exception> exceptions;
+    private final IResultCallback<Void> callback;
 
     public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus 
status, List<Exception> exceptions,
             IResultCallback<Void> callback) {
@@ -51,12 +53,10 @@ public class JobCleanupWork extends AbstractWork {
 
     @Override
     public void run() {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Cleanup for job: {}", jobId);
-        }
+        LOGGER.info("cleaning up {} on NCs, status={}", jobId, status);
         final JobRun jobRun = jobManager.get(jobId);
         if (jobRun == null) {
-            LOGGER.debug("Ignoring cleanup for unknown job: {}", jobId);
+            LOGGER.debug("ignoring cleanup for unknown {}", jobId);
             return;
         }
         try {
@@ -80,4 +80,9 @@ public class JobCleanupWork extends AbstractWork {
         return getName() + ": JobId@" + jobId + " Status@" + status
                 + (exceptions == null ? "" : " Exceptions@" + exceptions);
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
index cfedfc9d02..7606dc9a98 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java
@@ -35,6 +35,7 @@ import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.deployment.DeploymentUtils;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
 
 public class JobStartWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
@@ -85,4 +86,9 @@ public class JobStartWork extends SynchronizableWork {
             callback.setException(e);
         }
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
index c3a09f9da3..76a72c0da3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java
@@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 public class JobletCleanupNotificationWork extends AbstractHeartbeatWork {
+
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final JobId jobId;
@@ -46,6 +47,7 @@ public class JobletCleanupNotificationWork extends 
AbstractHeartbeatWork {
 
     @Override
     public void runWork() {
+        LOGGER.debug("node {} finished job clean-up {}", nodeId, jobId);
         IJobManager jobManager = ccs.getJobManager();
         final JobRun run = jobManager.get(jobId);
         if (run == null) {
@@ -82,4 +84,9 @@ public class JobletCleanupNotificationWork extends 
AbstractHeartbeatWork {
     public String toString() {
         return getName() + " jobId:" + jobId + ", nodeId:" + nodeId;
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
index ec21785baa..810fda24e4 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java
@@ -31,6 +31,7 @@ import 
org.apache.hyracks.control.common.controllers.NodeParameters;
 import org.apache.hyracks.control.common.controllers.NodeRegistration;
 import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -83,4 +84,9 @@ public class RegisterNodeWork extends SynchronizableWork {
             nc.sendRegistrationResult(params, e);
         }
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
index ee10669952..9f740ef375 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java
@@ -50,9 +50,7 @@ public class RemoveDeadNodesWork extends AbstractWork {
             Collection<JobId> affectedJobIds = result.getRight();
             int size = affectedJobIds.size();
             if (size > 0) {
-                if (LOGGER.isInfoEnabled()) {
-                    LOGGER.info("Number of affected jobs: " + size);
-                }
+                LOGGER.info("number of affected jobs due to dead nodes removal 
{}", size);
                 IJobManager jobManager = ccs.getJobManager();
                 for (JobId jobId : affectedJobIds) {
                     JobRun run = jobManager.get(jobId);
@@ -71,6 +69,6 @@ public class RemoveDeadNodesWork extends AbstractWork {
 
     @Override
     public Level logLevel() {
-        return Level.DEBUG;
+        return Level.TRACE;
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
index 0c531422ff..80dbd2ea67 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java
@@ -29,8 +29,13 @@ import org.apache.hyracks.control.cc.job.TaskAttempt;
 import org.apache.hyracks.control.common.job.profiling.om.JobProfile;
 import org.apache.hyracks.control.common.job.profiling.om.JobletProfile;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 public class TaskCompleteWork extends AbstractTaskLifecycleWork {
+
+    private static final Logger LOGGER = LogManager.getLogger();
     private final TaskProfile statistics;
 
     public TaskCompleteWork(ClusterControllerService ccs, JobId jobId, 
TaskAttemptId taId, String nodeId,
@@ -46,6 +51,7 @@ public class TaskCompleteWork extends 
AbstractTaskLifecycleWork {
         if (run == null) {
             return;
         }
+        LOGGER.debug("node completed task {}:{}:{}", nodeId, jobId, taId);
         if (statistics != null) {
             JobProfile jobProfile = run.getJobProfile();
             Map<String, JobletProfile> jobletProfiles = 
jobProfile.getJobletProfiles();
@@ -63,4 +69,9 @@ public class TaskCompleteWork extends 
AbstractTaskLifecycleWork {
     public String toString() {
         return getName() + ": [" + nodeId + "[" + jobId + ":" + taId + "]";
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 833066ea4a..48fd403c56 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,17 +22,14 @@ import java.util.List;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.ErrorMessageUtil;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
-    private static final Logger LOGGER = LogManager.getLogger();
+
     private final List<Exception> exceptions;
 
     public TaskFailureWork(ClusterControllerService ccs, JobId jobId, 
TaskAttemptId taId, String nodeId,
@@ -43,9 +40,6 @@ public class TaskFailureWork extends 
AbstractTaskLifecycleWork {
 
     @Override
     protected void performEvent(TaskAttempt ta) {
-        Exception ex = exceptions.get(0);
-        LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : 
Level.WARN,
-                "Executing task failure work for " + this, ex);
         IJobManager jobManager = ccs.getJobManager();
         JobRun run = jobManager.get(jobId);
         if (run == null) {
@@ -57,6 +51,7 @@ public class TaskFailureWork extends 
AbstractTaskLifecycleWork {
 
     @Override
     public String toString() {
-        return getName() + ": [" + jobId + ":" + taId + ":" + nodeId + "]";
+        return getName() + ": [" + jobId + ":" + taId + ":" + nodeId + "] "
+                + ErrorMessageUtil.getCauseMessage(exceptions.get(0));
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
index ed3e574323..63d5340176 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java
@@ -29,6 +29,7 @@ import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.common.work.IResultCallback;
 import org.apache.hyracks.control.common.work.SynchronizableWork;
+import org.apache.logging.log4j.Level;
 
 public class WaitForJobCompletionWork extends SynchronizableWork {
     private final ClusterControllerService ccs;
@@ -92,4 +93,9 @@ public class WaitForJobCompletionWork extends 
SynchronizableWork {
     public String toString() {
         return getName() + " jobId:" + jobId;
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
index eee8950da4..e52e3acc66 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java
@@ -52,7 +52,6 @@ public class MaterializingPipelinedPartition implements 
IFrameWriter, IPartition
     private boolean failed;
     protected boolean flushRequest;
     private boolean deallocated;
-    private Level openCloseLevel = Level.DEBUG;
     private Thread dataConsumerThread;
 
     public MaterializingPipelinedPartition(IHyracksTaskContext ctx, 
PartitionManager manager, PartitionId pid,
@@ -181,9 +180,6 @@ public class MaterializingPipelinedPartition implements 
IFrameWriter, IPartition
 
     @Override
     public void open() throws HyracksDataException {
-        if (LOGGER.isEnabled(openCloseLevel)) {
-            LOGGER.log(openCloseLevel, "open(" + pid + " by " + taId);
-        }
         size = 0;
         eos = false;
         failed = false;
@@ -215,9 +211,6 @@ public class MaterializingPipelinedPartition implements 
IFrameWriter, IPartition
 
     @Override
     public void close() throws HyracksDataException {
-        if (LOGGER.isEnabled(openCloseLevel)) {
-            LOGGER.log(openCloseLevel, "close(" + pid + " by " + taId);
-        }
         if (writeHandle != null) {
             ctx.getIoManager().close(writeHandle);
         }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
index 6d4f1730b5..f386a89579 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java
@@ -29,11 +29,12 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 public class ApplicationMessageWork extends AbstractWork {
+
     private static final Logger LOGGER = LogManager.getLogger();
-    private byte[] message;
-    private DeploymentId deploymentId;
-    private String nodeId;
-    private NodeControllerService ncs;
+    private final byte[] message;
+    private final DeploymentId deploymentId;
+    private final String nodeId;
+    private final NodeControllerService ncs;
 
     public ApplicationMessageWork(NodeControllerService ncs, byte[] message, 
DeploymentId deploymentId, String nodeId) {
         this.ncs = ncs;
@@ -58,6 +59,11 @@ public class ApplicationMessageWork extends AbstractWork {
         }
     }
 
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
+
     @Override
     public String toString() {
         return getName() + ": nodeId: " + nodeId;
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
index 75edd38b00..2036d7218f 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java
@@ -25,10 +25,12 @@ import org.apache.hyracks.api.job.JobStatus;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.Joblet;
 import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 public class CleanupJobletWork extends AbstractWork {
+
     private static final Logger LOGGER = LogManager.getLogger();
 
     private final NodeControllerService ncs;
@@ -45,7 +47,7 @@ public class CleanupJobletWork extends AbstractWork {
 
     @Override
     public void run() {
-        LOGGER.debug("cleaning up after job: {}", jobId);
+        LOGGER.debug("cleaning up {}, status:{}", jobId, status);
         ncs.removeJobParameterByteStore(jobId);
         ncs.getPartitionManager().jobCompleted(jobId, status);
         Map<JobId, Joblet> jobletMap = ncs.getJobletMap();
@@ -59,4 +61,9 @@ public class CleanupJobletWork extends AbstractWork {
     public String toString() {
         return getName() + " jobId:" + jobId + ", status:" + status;
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
index 60860c5afe..52469dc3b2 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hyracks.control.nc.work;
 
+import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.control.common.job.profiling.om.TaskProfile;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
@@ -38,13 +40,16 @@ public class NotifyTaskCompleteWork extends AbstractWork {
 
     @Override
     public void run() {
-        TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), 
task.getPartitionSendProfile(),
+        JobId jobId = task.getJoblet().getJobId();
+        TaskAttemptId taskAttemptId = task.getTaskAttemptId();
+        LOGGER.debug("notifying CC of task complete {}:{}", jobId, 
taskAttemptId);
+        TaskProfile taskProfile = new TaskProfile(taskAttemptId, 
task.getPartitionSendProfile(),
                 task.getStatsCollector(), task.getWarnings(), 
task.getWarningCollector().getTotalWarningsCount());
         try {
-            
ncs.getClusterController(task.getJobletContext().getJobId().getCcId()).notifyTaskComplete(
-                    task.getJobletContext().getJobId(), 
task.getTaskAttemptId(), ncs.getId(), taskProfile);
+            
ncs.getClusterController(task.getJobletContext().getJobId().getCcId())
+                    .notifyTaskComplete(task.getJobletContext().getJobId(), 
taskAttemptId, ncs.getId(), taskProfile);
         } catch (Exception e) {
-            LOGGER.log(Level.ERROR, "Failed notifying task complete for " + 
task.getTaskAttemptId(), e);
+            LOGGER.log(Level.ERROR, "Failed notifying task complete for {}", 
taskAttemptId, e);
         }
         task.getJoblet().removeTask(task);
     }
@@ -54,4 +59,9 @@ public class NotifyTaskCompleteWork extends AbstractWork {
         return getName() + ": [" + ncs.getId() + "[" + 
task.getJoblet().getJobId() + ":" + task.getTaskAttemptId()
                 + "]";
     }
+
+    @Override
+    public Level logLevel() {
+        return Level.TRACE;
+    }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index b0c60aa3d3..cd79da756c 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -23,7 +23,7 @@ import java.util.List;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.ExceptionUtils;
+import org.apache.hyracks.api.util.ErrorMessageUtil;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
@@ -50,9 +50,6 @@ public class NotifyTaskFailureWork extends AbstractWork {
 
     @Override
     public void run() {
-        Exception ex = exceptions.get(0);
-        LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : 
Level.WARN, "task " + taskId + " has failed",
-                ex);
         try {
             IResultPartitionManager resultPartitionManager = 
ncs.getResultPartitionManager();
             if (resultPartitionManager != null) {
@@ -69,6 +66,8 @@ public class NotifyTaskFailureWork extends AbstractWork {
 
     @Override
     public String toString() {
-        return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + 
"]";
+        return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + 
"]"
+                + ((exceptions != null && !exceptions.isEmpty())
+                        ? " " + 
ErrorMessageUtil.getCauseMessage(exceptions.get(0)) : "");
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
index f277046496..dd4a956bd3 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java
@@ -318,6 +318,6 @@ public class StartTasksWork extends AbstractWork {
 
     @Override
     public String toString() {
-        return getName() + " jobId:" + jobId;
+        return getName() + " jobId:" + jobId + " tasks:" + 
taskDescriptors.size();
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index 555e8fbfa3..0f31491feb 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -319,9 +319,6 @@ public class OptimizedHybridHashJoinOperatorDescriptor 
extends AbstractOperatorD
                         if (!failed) {
                             state.hybridHJ.closeBuild();
                             ctx.setStateObject(state);
-                            if (LOGGER.isTraceEnabled()) {
-                                LOGGER.trace("OptimizedHybridHashJoin closed 
its build phase");
-                            }
                         } else {
                             state.hybridHJ.clearBuildTempFiles();
                         }
@@ -402,10 +399,6 @@ public class OptimizedHybridHashJoinOperatorDescriptor 
extends AbstractOperatorD
 
                     writer.open();
                     state.hybridHJ.initProbe(probComp);
-
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("OptimizedHybridHashJoin is starting the 
probe phase.");
-                    }
                 }
 
                 @Override
@@ -416,7 +409,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor 
extends AbstractOperatorD
                 @Override
                 public void fail() throws HyracksDataException {
                     failed = true;
-                    if (state.hybridHJ != null) {
+                    if (state != null && state.hybridHJ != null) {
                         state.hybridHJ.fail();
                     }
                     writer.fail();
@@ -427,12 +420,13 @@ public class OptimizedHybridHashJoinOperatorDescriptor 
extends AbstractOperatorD
                     if (failed) {
                         try {
                             // Clear temp files if fail() was called.
-                            state.hybridHJ.clearBuildTempFiles();
-                            state.hybridHJ.clearProbeTempFiles();
+                            if (state != null && state.hybridHJ != null) {
+                                state.hybridHJ.clearBuildTempFiles();
+                                state.hybridHJ.clearProbeTempFiles();
+                            }
                         } finally {
                             writer.close(); // writer should always be closed.
                         }
-                        logProbeComplete();
                         return;
                     }
                     try {
@@ -477,17 +471,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor 
extends AbstractOperatorD
                         // Re-throw the whatever is caught.
                         throw e;
                     } finally {
-                        try {
-                            logProbeComplete();
-                        } finally {
-                            writer.close();
-                        }
-                    }
-                }
-
-                private void logProbeComplete() {
-                    if (LOGGER.isDebugEnabled()) {
-                        LOGGER.debug("OptimizedHybridHashJoin closed its probe 
phase");
+                        writer.close();
                     }
                 }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
index 08f15b3669..a1704ec077 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java
@@ -41,13 +41,9 @@ import 
org.apache.hyracks.dataflow.std.structures.IResetableComparable;
 import org.apache.hyracks.dataflow.std.structures.IResetableComparableFactory;
 import org.apache.hyracks.dataflow.std.structures.MaxHeap;
 import org.apache.hyracks.dataflow.std.structures.TuplePointer;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 public class TupleSorterHeapSort implements ITupleSorter {
 
-    private static final Logger LOGGER = LogManager.getLogger();
-
     class HeapEntryFactory implements IResetableComparableFactory<HeapEntry> {
         @Override
         public IResetableComparable<HeapEntry> createResetableComparable() {
@@ -288,7 +284,6 @@ public class TupleSorterHeapSort implements ITupleSorter {
         int maxFrameSize = outputFrame.getFrameSize();
         int numEntries = heap.getNumEntries();
         IResetableComparable[] entries = heap.getEntries();
-        int io = 0;
         for (int i = 0; i < numEntries; i++) {
             HeapEntry minEntry = (HeapEntry) entries[i];
             bufferAccessor1.reset(minEntry.tuplePointer);
@@ -296,14 +291,10 @@ public class TupleSorterHeapSort implements ITupleSorter {
                     bufferAccessor1.getTupleStartOffset(), 
bufferAccessor1.getTupleLength());
             if (flushed > 0) {
                 maxFrameSize = Math.max(maxFrameSize, flushed);
-                io++;
             }
         }
         maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize());
         outputAppender.write(writer, true);
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Flushed records:" + numEntries + "; Flushed through " 
+ (io + 1) + " frames");
-        }
         return maxFrameSize;
     }
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
index be22b9ca0d..7a75a0fcca 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java
@@ -34,7 +34,9 @@ import org.apache.hyracks.api.job.JobFlag;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.job.resource.ClusterCapacity;
 import org.apache.hyracks.api.job.resource.IJobCapacityController;
+import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity;
 import org.apache.hyracks.api.result.IResultSet;
 import org.apache.hyracks.api.result.IResultSetReader;
 import org.apache.hyracks.client.result.ResultSet;
@@ -254,6 +256,14 @@ public abstract class AbstractMultiNCIntegrationTest {
                 public void release(JobSpecification job) {
 
                 }
+
+                @Override
+                public IReadOnlyClusterCapacity getClusterCapacity() {
+                    ClusterCapacity clusterCapacity = new ClusterCapacity();
+                    clusterCapacity.setAggregatedMemoryByteSize(maxRAM);
+                    clusterCapacity.setAggregatedCores(Integer.MAX_VALUE);
+                    return clusterCapacity;
+                }
             };
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
index 1ff9fa8b25..842ec61527 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java
@@ -34,7 +34,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
 import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
 import org.apache.hyracks.storage.common.buffercache.IBufferCache;
 import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -80,11 +79,7 @@ public class ComponentUtils {
      */
     public static void get(ILSMIndex index, IValueReference key, 
ArrayBackedValueStorage value)
             throws HyracksDataException {
-        boolean loggable = LOGGER.isDebugEnabled();
         value.reset();
-        if (loggable) {
-            LOGGER.log(Level.DEBUG, "Getting " + key + " from index " + index);
-        }
         // Lock the opTracker to ensure index components don't change
         synchronized (index.getOperationTracker()) {
             ILSMMemoryComponent cmc = index.getCurrentMemoryComponent();
@@ -92,33 +87,17 @@ public class ComponentUtils {
                 index.getCurrentMemoryComponent().getMetadata().get(key, 
value);
             }
             if (value.getLength() == 0) {
-                if (loggable) {
-                    LOGGER.log(Level.DEBUG, key + " was not found in mutable 
memory component of " + index);
-                }
-                // was not found in the in current mutable component, search 
in the other in memory components
+                // was not found in the in current mutable component, search 
in the other in-memory components
                 fromImmutableMemoryComponents(index, key, value);
                 if (value.getLength() == 0) {
-                    if (loggable) {
-                        LOGGER.log(Level.DEBUG, key + " was not found in all 
immmutable memory components of " + index);
-                    }
-                    // was not found in the in all in memory components, 
search in the disk components
+                    // was not found in all in-memory components, search in 
the disk components
                     fromDiskComponents(index, key, value);
-                    if (loggable) {
-                        if (value.getLength() == 0) {
-                            LOGGER.log(Level.DEBUG, key + " was not found in 
all disk components of " + index);
-                        } else {
-                            LOGGER.log(Level.DEBUG, key + " was found in disk 
components of " + index);
-                        }
-                    }
-                } else {
-                    if (loggable) {
-                        LOGGER.log(Level.DEBUG, key + " was found in the 
immutable memory components of " + index);
+                    if (value.getLength() == 0) {
+                        LOGGER.debug("{} was NOT found", key);
                     }
                 }
             } else {
-                if (loggable) {
-                    LOGGER.log(Level.DEBUG, key + " was found in mutable 
memory component of " + index);
-                }
+                LOGGER.debug("{} was found in mutable memory component {}", 
key, cmc);
             }
         }
     }
@@ -143,17 +122,11 @@ public class ComponentUtils {
 
     private static void fromDiskComponents(ILSMIndex index, IValueReference 
key, ArrayBackedValueStorage value)
             throws HyracksDataException {
-        boolean loggable = LOGGER.isDebugEnabled();
-        if (loggable) {
-            LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components 
of " + index);
-        }
         for (ILSMDiskComponent c : index.getDiskComponents()) {
-            if (loggable) {
-                LOGGER.log(Level.DEBUG, "Getting " + key + " from disk 
components " + c);
-            }
             c.getMetadata().get(key, value);
             if (value.getLength() != 0) {
                 // Found
+                LOGGER.debug("{} was found in disk component {}", key, c);
                 return;
             }
         }
@@ -161,21 +134,10 @@ public class ComponentUtils {
 
     private static void fromImmutableMemoryComponents(ILSMIndex index, 
IValueReference key,
             ArrayBackedValueStorage value) throws HyracksDataException {
-        boolean loggable = LOGGER.isDebugEnabled();
-        if (loggable) {
-            LOGGER.log(Level.DEBUG, "Getting " + key + " from immutable memory 
components of " + index);
-        }
         List<ILSMMemoryComponent> memComponents = index.getMemoryComponents();
         int numOtherMemComponents = memComponents.size() - 1;
         int next = index.getCurrentMemoryComponentIndex();
-        if (loggable) {
-            LOGGER.log(Level.DEBUG, index + " has " + numOtherMemComponents + 
" immutable memory components");
-        }
         for (int i = 0; i < numOtherMemComponents; i++) {
-            if (loggable) {
-                LOGGER.log(Level.DEBUG,
-                        "trying to get " + key + " from immutable memory 
components number: " + (i + 1));
-            }
             next = next - 1;
             if (next < 0) {
                 next = memComponents.size() - 1;
@@ -185,6 +147,7 @@ public class ComponentUtils {
                 c.getMetadata().get(key, value);
                 if (value.getLength() != 0) {
                     // Found
+                    LOGGER.debug("{} was found in immutable memory component 
{}", key, c);
                     return;
                 }
             }


Reply via email to