This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit f124592d2be9dfc37b871c2bcd34b30bc5980902 Author: Ali Alsuliman <[email protected]> AuthorDate: Sat Jun 8 03:05:28 2024 +0300 [NO ISSUE][OTH] Logging enhancements + query job logging - user model changes: no - storage format changes: no - interface changes: yes Details: - Reduce some JobWork logging level to TRACE. - Remove not useful logs. - Add context to cancel request/response messages. - Avoid unnecessary NPEs when closing the pipeline causing not needed stack traces in logs. - Add method to get Job Queue size. - Add method to IJobCapacityController to get cluster current capacity for logging. This patch includes backports from: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18318 https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18016 https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18237 Change-Id: I0389e693493d99a12483b94c50eeb1697f69515f Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18353 Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Jenkins <[email protected]> --- .../org/apache/asterix/active/ActiveManager.java | 3 +- .../app/active/ActiveEntityEventsListener.java | 67 +++++++-------------- .../app/active/ActiveNotificationHandler.java | 70 ++++++---------------- .../asterix/app/message/CancelQueryRequest.java | 11 ++-- .../asterix/app/message/CancelQueryResponse.java | 4 ++ .../evaluators/functions/SleepDescriptor.java | 8 +-- .../job/resource/JobCapacityController.java | 5 ++ ...stractOneInputOneOutputOneFramePushRuntime.java | 2 +- .../job/resource/DefaultJobCapacityController.java | 10 ++++ .../api/job/resource/IJobCapacityController.java | 6 ++ .../hyracks/control/cc/executor/JobExecutor.java | 11 ++-- .../apache/hyracks/control/cc/job/JobManager.java | 27 ++++++++- .../control/cc/partitions/PartitionMatchMaker.java | 8 +-- .../control/cc/result/ResultDirectoryService.java | 25 ++++---- .../hyracks/control/cc/scheduler/FIFOJobQueue.java | 5 ++ .../hyracks/control/cc/scheduler/IJobQueue.java | 7 +++ .../control/cc/work/ApplicationMessageWork.java | 10 +++- .../cc/work/GetNodeControllersInfoWork.java | 9 ++- .../cc/work/GetResultDirectoryAddressWork.java | 7 +++ .../cc/work/GetResultPartitionLocationsWork.java | 6 ++ .../hyracks/control/cc/work/JobCleanupWork.java | 23 ++++--- .../hyracks/control/cc/work/JobStartWork.java | 6 ++ .../cc/work/JobletCleanupNotificationWork.java | 7 +++ .../hyracks/control/cc/work/RegisterNodeWork.java | 6 ++ .../control/cc/work/RemoveDeadNodesWork.java | 6 +- .../hyracks/control/cc/work/TaskCompleteWork.java | 11 ++++ .../hyracks/control/cc/work/TaskFailureWork.java | 13 ++-- .../control/cc/work/WaitForJobCompletionWork.java | 6 ++ .../MaterializingPipelinedPartition.java | 7 --- .../control/nc/work/ApplicationMessageWork.java | 14 +++-- .../hyracks/control/nc/work/CleanupJobletWork.java | 9 ++- .../control/nc/work/NotifyTaskCompleteWork.java | 18 ++++-- .../control/nc/work/NotifyTaskFailureWork.java | 9 ++- .../hyracks/control/nc/work/StartTasksWork.java | 2 +- .../OptimizedHybridHashJoinOperatorDescriptor.java | 28 ++------- .../dataflow/std/sort/TupleSorterHeapSort.java | 9 --- .../AbstractMultiNCIntegrationTest.java | 10 ++++ .../storage/am/lsm/common/util/ComponentUtils.java | 51 +++------------- 38 files changed, 283 insertions(+), 253 deletions(-) diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index b99e4f2aec..ed481bc666 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -104,9 +104,9 @@ public class ActiveManager { } public void submit(ActiveManagerMessage message) throws HyracksDataException { - LOGGER.debug("Message of type {} received in {}", message.getKind(), nodeId); switch (message.getKind()) { case STOP_ACTIVITY: + LOGGER.debug("Message of type {} received in {}", message.getKind(), nodeId); stopRuntime(message); break; case REQUEST_STATS: @@ -151,7 +151,6 @@ public class ActiveManager { return; } String stats = runtime.getStats(); - LOGGER.debug("Sending stats response for {} ", runtimeId); ActiveStatsResponse response = new ActiveStatsResponse(reqId, stats, null); ((NodeControllerService) serviceCtx.getControllerService()).sendRealTimeApplicationMessageToCC( message.getCcId(), JavaSerializationUtils.serialize(response), null); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 626b93828e..7b253f1544 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -138,7 +138,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl } protected synchronized void setState(ActivityState newState) { - LOGGER.log(level, "State of {} is being set to {} from {}", getEntityId(), newState, state); + LOGGER.log(level, "state of {} is being set from {} to {}", getEntityId(), state, newState); this.prevState = state; this.state = newState; if (newState == ActivityState.STARTING || newState == ActivityState.RECOVERING @@ -153,9 +153,8 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void notify(ActiveEvent event) { try { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "EventListener is notified."); - } + LOGGER.debug("CC handling event {}; state={}, prev state={}, suspended={}", event, state, prevState, + suspended); ActiveEvent.Kind eventKind = event.getEventKind(); switch (eventKind) { case JOB_CREATED: @@ -194,26 +193,21 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @SuppressWarnings("unchecked") protected void finish(ActiveEvent event) throws HyracksDataException { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Active job {} finished", jobId); - } JobId lastJobId = jobId; + Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); if (numRegistered != numDeRegistered) { LOGGER.log(Level.WARN, - "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId, - numRegistered, numDeRegistered); + "ingestion job {} finished with status={}, reported runtime registrations={}, deregistrations={}", + jobId, status, numRegistered, numDeRegistered); } jobId = null; - Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); JobStatus jobStatus = status.getLeft(); List<Exception> exceptions = status.getRight(); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus); - } + LOGGER.debug("ingestion job {} finished with status {}", lastJobId, jobStatus); if (!jobSuccessfullyTerminated(jobStatus)) { jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) : exceptions.get(0); - LOGGER.error("Active Job {} failed", lastJobId, jobFailure); + LOGGER.error("ingestion job {} failed", lastJobId, jobFailure); setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED); if (prevState == ActivityState.RUNNING) { @@ -371,16 +365,14 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void recover() { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Recover is called on {}", entityId); - } if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { - LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); + LOGGER.debug("recover is called on {} w/o recovery policy; setting to permanent failure", entityId); setState(ActivityState.STOPPED); } else { + LOGGER.debug("recover is called on {}", entityId); ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); setState(ActivityState.TEMPORARILY_FAILED); - LOGGER.log(level, "Recovery task has been submitted"); + LOGGER.debug("recovery task has been submitted"); rt = createRecoveryTask(); executor.submit(rt.recover()); } @@ -479,15 +471,11 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl // Note: once we start sending stop messages, we can't go back until the entity is stopped final String nameBefore = Thread.currentThread().getName(); try { - Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); + Thread.currentThread().setName(nameBefore + " : wait-for-ingestion-completion: " + jobId); sendStopMessages(metadataProvider, timeout, unit); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Waiting for its state to become " + waitFor); - } + LOGGER.debug("waiting for {} to become {}", jobId, waitFor); subscriber.sync(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Disconnect has been completed " + waitFor); - } + LOGGER.debug("disconnect has been completed {}", waitFor); } catch (InterruptedException ie) { forceStop(subscriber, ie); Thread.currentThread().interrupt(); @@ -513,13 +501,8 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker(); AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations(); int partition = 0; - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations); - } + LOGGER.log(Level.INFO, "sending stop messages to {}", runtimeLocations); for (String location : runtimeLocations.getLocations()) { - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, "Sending to " + location); - } ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++); messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, runtimeId, new StopRuntimeParameters(timeout, unit)), location); @@ -581,14 +564,10 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl WaitForStateSubscriber subscriber; Future<Void> suspendTask; synchronized (this) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "suspending entity " + entityId); - LOGGER.log(level, "Waiting for ongoing activities"); - } + LOGGER.log(level, "{} suspending entity {}", jobId, entityId); + LOGGER.log(level, "{} waiting for ongoing activities", jobId); waitForNonTransitionState(); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Proceeding with suspension. Current state is " + state); - } + LOGGER.log(level, "{} proceeding with suspension. current state is {}", jobId, state); if (state == ActivityState.STOPPED) { suspended = true; return; @@ -609,12 +588,12 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl doSuspend(metadataProvider); return null; }); - LOGGER.log(level, "Suspension task has been submitted"); + LOGGER.log(level, "{} suspension task has been submitted", jobId); } try { - LOGGER.log(level, "Waiting for suspension task to complete"); + LOGGER.log(level, "{} waiting for suspension task to complete", jobId); suspendTask.get(); - LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED"); + LOGGER.log(level, "{} waiting for state to become SUSPENDED or TEMPORARILY_FAILED", jobId); subscriber.sync(); suspended = true; } catch (Exception e) { @@ -736,7 +715,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public String toString() { - return "{\"class\":\"" + getClass().getSimpleName() + "\"," + "\"entityId\":\"" + entityId + "\"," - + "\"state\":\"" + state + "\"" + "}"; + return "{\"class\":\"" + getClass().getSimpleName() + "\", \"entityId\":\"" + entityId + "\", \"state\":\"" + + state + "\", \"prev state\":\"" + prevState + "\", \"suspended\":" + suspended + "}"; } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 3c277d5aa6..662884d48b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -72,16 +72,14 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active EntityId entityId = jobId2EntityId.get(jobId); if (entityId != null) { IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Next event is {} for job {}", eventKind, jobId); - } if (eventKind == Kind.JOB_FINISHED) { - LOGGER.log(level, "Removing job {}", jobId); + LOGGER.debug("removing ingestion job {}", jobId); jobId2EntityId.remove(jobId); } if (listener != null) { - LOGGER.log(level, "Notifying the listener"); listener.notify(event); + } else { + LOGGER.debug("listener not found for entity {} on event={}", entityId, event); } } else { LOGGER.log(Level.ERROR, "Entity not found for event {} for job {}", eventKind, jobId); @@ -92,45 +90,29 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "notifyJobCreation was called for job {}", jobId); - } Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); if (!(property instanceof EntityId)) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property); + if (property != null) { + LOGGER.debug("{} is not an ingestion job. job property={}", jobId, property); } return; } + LOGGER.debug("notified of ingestion job creation {}", jobId); EntityId entityId = (EntityId) property; monitorJob(jobId, entityId); - boolean found = jobId2EntityId.get(jobId) != null; - LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive")); add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification)); } private synchronized void monitorJob(JobId jobId, EntityId entityId) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "monitorJob was called for job {}", jobId); - } - boolean found = jobId2EntityId.get(jobId) != null; - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive")); - } + boolean found = jobId2EntityId.containsKey(jobId); + LOGGER.debug("{} was {}", jobId, (found ? "active" : "inactive")); if (entityEventListeners.containsKey(entityId)) { - if (jobId2EntityId.containsKey(jobId)) { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("Job {} is already being monitored", jobId); - } + if (found) { + LOGGER.error("{} is already being monitored", jobId); return; } - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Monitoring started for job {}", jobId); - } } else { - if (LOGGER.isEnabled(level)) { - LOGGER.info("No listener was found for the entity {} for job {}", entityId, jobId); - } + LOGGER.debug("no listener found for entity {}; {}", entityId, jobId); } jobId2EntityId.put(jobId, entityId); } @@ -141,22 +123,18 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (entityId != null) { add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null)); } + // else must be non-active job, e.g. a job for a query } @Override public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Getting notified of job finish for job {}", jobId); - } EntityId entityId = jobId2EntityId.get(jobId); if (entityId != null) { + LOGGER.debug("notified of ingestion job finish {}", jobId); add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions))); - } else { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "No need to notify JOB_FINISHED for job {}", jobId); - } } + // else must be non-active job, e.g. a job for a query } // *** IActiveNotificationHandler @@ -169,13 +147,6 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public IActiveEntityEventsListener getListener(EntityId entityId) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "getActiveEntityListener was called with entity {}", entityId); - } - IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Listener found: {}", listener); - } return entityEventListeners.get(entityId); } @@ -197,9 +168,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "registerListener was called for the entity {}", listener.getEntityId()); - } + LOGGER.debug("register listener for entity {}, state={}", listener.getEntityId(), listener.getState()); if (entityEventListeners.containsKey(listener.getEntityId())) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId()); } @@ -211,9 +180,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "unregisterListener was called for the entity {}", listener.getEntityId()); - } + LOGGER.debug("unregister listener for entity {}, state={}", listener.getEntityId(), listener.getState()); IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId()); if (registeredListener == null) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, listener.getEntityId()); @@ -229,9 +196,8 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active LOGGER.info("Starting active recovery"); for (IActiveEntityEventsListener listener : getEventListeners()) { synchronized (listener) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Entity {} is {}", listener.getEntityId(), listener.getState()); - } + LOGGER.debug("entity {} is {}, active={}, suspended={}", listener.getEntityId(), listener.getState(), + listener.isActive(), listener.isSuspended()); listener.notifyAll(); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java index 65d1039aff..65700413b1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java @@ -69,10 +69,8 @@ public class CancelQueryRequest implements ICcAddressedMessage { } } } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Sending CancelQueryResponse to {}. requestId:{}, uuid:{}, contextId:{}, status:{}", nodeId, - requestId, uuid, contextId, status); - } + LOGGER.debug("sending CancelQueryResponse to {}. reqId:{}, uuid:{}, contextId:{}, status:{}", nodeId, requestId, + uuid, contextId, status); CancelQueryResponse response = new CancelQueryResponse(reqId, status); CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); try { @@ -82,4 +80,9 @@ public class CancelQueryRequest implements ICcAddressedMessage { } } + @Override + public String toString() { + return "CancelQueryRequest{from='" + nodeId + "', reqId=" + reqId + ", uuid='" + uuid + "', contextId='" + + contextId + "'}"; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java index d65ae316ee..68d34303eb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java @@ -49,4 +49,8 @@ public class CancelQueryResponse implements INcAddressedMessage { return status; } + @Override + public String toString() { + return "CancelQueryResponse{reqId=" + reqId + ", status=" + status + '}'; + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java index ac87f7e264..ef348fe0a7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java @@ -64,16 +64,16 @@ public class SleepDescriptor extends AbstractScalarFunctionDynamicDescriptor { final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset); try { - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, + if (LOGGER.isTraceEnabled()) { + LOGGER.log(Level.TRACE, ctx.getTaskContext().getTaskAttemptId() + " sleeping for " + time + " ms"); } Thread.sleep(time); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, + if (LOGGER.isTraceEnabled()) { + LOGGER.log(Level.TRACE, ctx.getTaskContext().getTaskAttemptId() + " done sleeping for " + time + " ms"); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java index b123a5ed2e..ae903d1f71 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java @@ -77,6 +77,11 @@ public class JobCapacityController implements IJobCapacityController { ensureMaxCapacity(); } + @Override + public IReadOnlyClusterCapacity getClusterCapacity() { + return resourceManager.getCurrentCapacity(); + } + private void ensureMaxCapacity() { final IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity(); final IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity(); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java index 9f4541f582..0c74260d6e 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java @@ -74,7 +74,7 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr } protected void flushIfNotFailed() throws HyracksDataException { - if (!failed && appender.getTupleCount() > 0) { + if (!failed && appender != null && appender.getTupleCount() > 0) { flushAndReset(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java index 9e38a20991..b18bcb10ee 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java @@ -24,6 +24,11 @@ import org.apache.hyracks.api.job.JobSpecification; public class DefaultJobCapacityController implements IJobCapacityController { public static final DefaultJobCapacityController INSTANCE = new DefaultJobCapacityController(); + private static final IClusterCapacity CAPACITY = new ClusterCapacity(); + static { + CAPACITY.setAggregatedCores(Integer.MAX_VALUE); + CAPACITY.setAggregatedMemoryByteSize(Long.MAX_VALUE); + } private DefaultJobCapacityController() { } @@ -37,4 +42,9 @@ public class DefaultJobCapacityController implements IJobCapacityController { public void release(JobSpecification job) { // No operation here. } + + @Override + public IReadOnlyClusterCapacity getClusterCapacity() { + return CAPACITY; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java index 5fa4bd9d83..f88baa2ee9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java @@ -57,4 +57,10 @@ public interface IJobCapacityController { */ void release(JobSpecification job); + /** + * The cluster current capacity. + * + * @return the cluster current capacity. + */ + IReadOnlyClusterCapacity getClusterCapacity(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java index 3574acda5f..98918508f4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java @@ -539,7 +539,7 @@ public class JobExecutor { private void abortTaskCluster(TaskClusterAttempt tcAttempt, TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) { - LOGGER.trace(() -> "Aborting task cluster: " + tcAttempt.getAttempt()); + LOGGER.trace("Aborting task cluster: {}", tcAttempt.getAttempt()); Set<TaskAttemptId> abortTaskIds = new HashSet<>(); Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>(); for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) { @@ -561,14 +561,12 @@ public class JobExecutor { } } final JobId jobId = jobRun.getJobId(); - LOGGER.trace(() -> "Abort map for job: " + jobId + ": " + abortTaskAttemptMap); + LOGGER.trace("Abort map for job: {}: {}", jobId, abortTaskAttemptMap); INodeManager nodeManager = ccs.getNodeManager(); abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> { final NodeControllerState node = nodeManager.getNodeControllerState(key); if (node != null) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Aborting: " + abortTaskAttempts + " at " + key); - } + LOGGER.trace("Aborting: {} at {}", abortTaskAttempts, key); try { node.getNodeController().abortTasks(jobId, abortTaskAttempts); } catch (Exception e) { @@ -683,7 +681,6 @@ public class JobExecutor { */ public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) { try { - LOGGER.debug("Received failure notification for TaskAttempt " + ta.getTaskAttemptId()); TaskAttemptId taId = ta.getTaskAttemptId(); TaskCluster tc = ta.getTask().getTaskCluster(); TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc); @@ -696,7 +693,7 @@ public class JobExecutor { LOGGER.trace(() -> "Marking TaskAttempt " + ta.getTaskAttemptId() + " as failed and the number of max re-attempts = " + maxReattempts); if (lastAttempt.getAttempt() >= maxReattempts || isCancelled()) { - LOGGER.debug(() -> "Aborting the job of " + ta.getTaskAttemptId()); + LOGGER.debug("Aborting the job:{} of {}", jobRun.getJobId(), ta.getTaskAttemptId()); abortJob(exceptions, NoOpCallback.INSTANCE); return; } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 4882f4ac49..ad971887f7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -38,7 +38,9 @@ import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IClusterCapacity; import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; @@ -129,7 +131,7 @@ public class JobManager implements IJobManager { if (activeRunMap.containsKey(jobId)) { JobRun jobRun = activeRunMap.get(jobId); // The following call will abort all ongoing tasks and then consequently - // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecyle of the job. + // trigger JobCleanupWork and JobCleanupNotificationWork which will update the lifecycle of the job. // Therefore, we do not remove the job out of activeRunMap here. jobRun.getExecutor().cancelJob(callback); return; @@ -139,7 +141,7 @@ public class JobManager implements IJobManager { if (jobRun != null) { List<Exception> exceptions = Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, jobId)); - // Since the job has not been executed, we only need to update its status and lifecyle here. + // Since the job has not been executed, we only need to update its status and lifecycle here. jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions); runMapArchive.put(jobId, jobRun); runMapHistory.put(jobId, exceptions); @@ -170,7 +172,6 @@ public class JobManager implements IJobManager { return; } if (run.getPendingStatus() != null) { - LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", run::getJobId); return; } Set<String> targetNodes = run.getParticipatingNodeIds(); @@ -313,6 +314,7 @@ public class JobManager implements IJobManager { run.setStartTime(System.currentTimeMillis()); run.setStartTimeZoneId(ZoneId.systemDefault().getId()); JobId jobId = run.getJobId(); + logJobCapacity(run, "running", Level.DEBUG); activeRunMap.put(jobId, run); run.setStatus(JobStatus.RUNNING, null); executeJobInternal(run); @@ -320,6 +322,7 @@ public class JobManager implements IJobManager { // Queue a job when the required capacity for the job is not met. private void queueJob(JobRun jobRun) throws HyracksException { + logJobCapacity(jobRun, "queueing", Level.INFO); jobRun.setStatus(JobStatus.PENDING, null); jobQueue.add(jobRun); } @@ -355,5 +358,23 @@ public class JobManager implements IJobManager { private void releaseJobCapacity(JobRun jobRun) { final JobSpecification job = jobRun.getJobSpecification(); jobCapacityController.release(job); + logJobCapacity(jobRun, "released", Level.DEBUG); + } + + private void logJobCapacity(JobRun jobRun, String jobStateDesc, Level lvl) { + IClusterCapacity requiredResources = jobRun.getJobSpecification().getRequiredClusterCapacity(); + if (requiredResources == null) { + return; + } + long requiredMemory = requiredResources.getAggregatedMemoryByteSize(); + int requiredCPUs = requiredResources.getAggregatedCores(); + if (requiredMemory == 0 && requiredCPUs == 0) { + return; + } + IReadOnlyClusterCapacity clusterCapacity = jobCapacityController.getClusterCapacity(); + LOGGER.log(lvl, "{} {}, memory={}, cpu={}, (new) cluster memory={}, cpu={}, currently running={}, queued={}", + jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs, + clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(), + getRunningJobsCount(), jobQueue.size()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java index ac29b530d4..b5df59339b 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java @@ -174,8 +174,8 @@ public class PartitionMatchMaker { } public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Removing uncommitted partitions: " + partitionIds); + if (partitionIds != null && !partitionIds.isEmpty()) { + LOGGER.debug("Removing uncommitted partitions {}", partitionIds); } IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() { @Override @@ -195,8 +195,8 @@ public class PartitionMatchMaker { } public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Removing partition requests: " + partitionIds); + if (partitionIds != null && !partitionIds.isEmpty()) { + LOGGER.debug("Removing partition requests {}", partitionIds); } IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() { @Override diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java index 9f8a7e24ea..46dd3512af 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java @@ -45,7 +45,6 @@ import org.apache.hyracks.api.result.ResultSetMetaData; import org.apache.hyracks.control.common.result.AbstractResultManager; import org.apache.hyracks.control.common.result.ResultStateSweeper; import org.apache.hyracks.control.common.work.IResultCallback; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -79,9 +78,7 @@ public class ResultDirectoryService extends AbstractResultManager implements IRe @Override public synchronized void notifyJobCreation(JobId jobId, JobSpecification spec) throws HyracksException { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(getClass().getSimpleName() + " notified of new job " + jobId); - } + LOGGER.debug("{} notified of new job {}", getClass().getSimpleName(), jobId); if (jobResultLocations.get(jobId) != null) { throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId); } @@ -157,15 +154,14 @@ public class ResultDirectoryService extends AbstractResultManager implements IRe @Override public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) { - Exception ex = exceptions.isEmpty() ? null : exceptions.get(0); - Level logLevel = Level.DEBUG; - if (LOGGER.isEnabled(logLevel)) { - LOGGER.log(logLevel, "job " + jobId + " failed and is being reported to " + getClass().getSimpleName(), ex); - } ResultJobRecord rjr = getResultJobRecord(jobId); + if (logFailure(rjr)) { + LOGGER.debug("job {} failed and is being reported to {}", jobId, getClass().getSimpleName()); + } if (rjr != null) { rjr.fail(exceptions); } + Exception ex = exceptions.isEmpty() ? null : exceptions.get(0); final JobResultInfo jobResultInfo = jobResultLocations.get(jobId); if (jobResultInfo != null) { jobResultInfo.setException(ex); @@ -211,6 +207,15 @@ public class ResultDirectoryService extends AbstractResultManager implements IRe } } + private static boolean logFailure(ResultJobRecord rjr) { + if (rjr == null) { + return true; + } + // don't re-log if the state is already failed + ResultJobRecord.Status status = rjr.getStatus(); + return status == null || status.getState() != State.FAILED; + } + /** * Compares the records already known by the client for the given job's result set id with the records that the * result directory service knows and if there are any newly discovered records returns a whole array with the @@ -264,7 +269,7 @@ public class ResultDirectoryService extends AbstractResultManager implements IRe class JobResultInfo { - private ResultJobRecord record; + private final ResultJobRecord record; private Waiters waiters; private Exception exception; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java index 260c6b94eb..38277c2af7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java @@ -121,4 +121,9 @@ public class FIFOJobQueue implements IJobQueue { public void clear() { jobListMap.clear(); } + + @Override + public int size() { + return jobListMap.size(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java index be40883f60..1f2c29a3cb 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java @@ -73,4 +73,11 @@ public interface IJobQueue { * Clears the job queue */ void clear(); + + /** + * Returns the number of queued jobs. + * + * @return the number of queued jobs. + */ + int size(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java index 771832eeea..6630ba72b4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.messages.IMessage; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.deployment.DeploymentUtils; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,8 +35,8 @@ import org.apache.logging.log4j.Logger; public class ApplicationMessageWork extends AbstractHeartbeatWork { private static final Logger LOGGER = LogManager.getLogger(); - private byte[] message; - private DeploymentId deploymentId; + private final byte[] message; + private final DeploymentId deploymentId; public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId, String nodeId) { @@ -56,6 +57,11 @@ public class ApplicationMessageWork extends AbstractHeartbeatWork { } } + @Override + public Level logLevel() { + return Level.TRACE; + } + @Override public String toString() { return getName() + ": nodeID: " + nodeId; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java index c36b887c28..f08e209444 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java @@ -24,10 +24,12 @@ import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.logging.log4j.Level; public class GetNodeControllersInfoWork extends AbstractWork { + private final INodeManager nodeManager; - private IResultCallback<Map<String, NodeControllerInfo>> callback; + private final IResultCallback<Map<String, NodeControllerInfo>> callback; public GetNodeControllersInfoWork(INodeManager nodeManager, IResultCallback<Map<String, NodeControllerInfo>> callback) { @@ -39,4 +41,9 @@ public class GetNodeControllersInfoWork extends AbstractWork { public void run() { callback.setValue(nodeManager.getNodeControllerInfoMap()); } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java index bf95ff236b..b7dbd7546e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java @@ -22,8 +22,10 @@ import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.logging.log4j.Level; public class GetResultDirectoryAddressWork extends SynchronizableWork { + private final ClusterControllerService ccs; private final IResultCallback<NetworkAddress> callback; @@ -42,4 +44,9 @@ public class GetResultDirectoryAddressWork extends SynchronizableWork { callback.setException(e); } } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java index d1d22697f9..1e34b96c04 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultPartitionLocationsWork.java @@ -28,6 +28,7 @@ import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.result.IResultDirectoryService; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.logging.log4j.Level; public class GetResultPartitionLocationsWork extends SynchronizableWork { private final ClusterControllerService ccs; @@ -68,4 +69,9 @@ public class GetResultPartitionLocationsWork extends SynchronizableWork { public String toString() { return getName() + ": JobId@" + jobId + " ResultSetId@" + rsId + " Known@" + Arrays.toString(knownRecords); } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java index 77d2f82f9c..6fe9909f05 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java @@ -28,17 +28,19 @@ import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class JobCleanupWork extends AbstractWork { + private static final Logger LOGGER = LogManager.getLogger(); - private IJobManager jobManager; - private JobId jobId; - private JobStatus status; - private List<Exception> exceptions; - private IResultCallback<Void> callback; + private final IJobManager jobManager; + private final JobId jobId; + private final JobStatus status; + private final List<Exception> exceptions; + private final IResultCallback<Void> callback; public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions, IResultCallback<Void> callback) { @@ -51,12 +53,10 @@ public class JobCleanupWork extends AbstractWork { @Override public void run() { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Cleanup for job: {}", jobId); - } + LOGGER.info("cleaning up {} on NCs, status={}", jobId, status); final JobRun jobRun = jobManager.get(jobId); if (jobRun == null) { - LOGGER.debug("Ignoring cleanup for unknown job: {}", jobId); + LOGGER.debug("ignoring cleanup for unknown {}", jobId); return; } try { @@ -80,4 +80,9 @@ public class JobCleanupWork extends AbstractWork { return getName() + ": JobId@" + jobId + " Status@" + status + (exceptions == null ? "" : " Exceptions@" + exceptions); } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java index cfedfc9d02..7606dc9a98 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java @@ -35,6 +35,7 @@ import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.deployment.DeploymentUtils; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.logging.log4j.Level; public class JobStartWork extends SynchronizableWork { private final ClusterControllerService ccs; @@ -85,4 +86,9 @@ public class JobStartWork extends SynchronizableWork { callback.setException(e); } } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java index c3a09f9da3..76a72c0da3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class JobletCleanupNotificationWork extends AbstractHeartbeatWork { + private static final Logger LOGGER = LogManager.getLogger(); private final JobId jobId; @@ -46,6 +47,7 @@ public class JobletCleanupNotificationWork extends AbstractHeartbeatWork { @Override public void runWork() { + LOGGER.debug("node {} finished job clean-up {}", nodeId, jobId); IJobManager jobManager = ccs.getJobManager(); final JobRun run = jobManager.get(jobId); if (run == null) { @@ -82,4 +84,9 @@ public class JobletCleanupNotificationWork extends AbstractHeartbeatWork { public String toString() { return getName() + " jobId:" + jobId + ", nodeId:" + nodeId; } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index ec21785baa..810fda24e4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -31,6 +31,7 @@ import org.apache.hyracks.control.common.controllers.NodeParameters; import org.apache.hyracks.control.common.controllers.NodeRegistration; import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy; import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -83,4 +84,9 @@ public class RegisterNodeWork extends SynchronizableWork { nc.sendRegistrationResult(params, e); } } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java index ee10669952..9f740ef375 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java @@ -50,9 +50,7 @@ public class RemoveDeadNodesWork extends AbstractWork { Collection<JobId> affectedJobIds = result.getRight(); int size = affectedJobIds.size(); if (size > 0) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Number of affected jobs: " + size); - } + LOGGER.info("number of affected jobs due to dead nodes removal {}", size); IJobManager jobManager = ccs.getJobManager(); for (JobId jobId : affectedJobIds) { JobRun run = jobManager.get(jobId); @@ -71,6 +69,6 @@ public class RemoveDeadNodesWork extends AbstractWork { @Override public Level logLevel() { - return Level.DEBUG; + return Level.TRACE; } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java index 0c531422ff..80dbd2ea67 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java @@ -29,8 +29,13 @@ import org.apache.hyracks.control.cc.job.TaskAttempt; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.job.profiling.om.JobletProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class TaskCompleteWork extends AbstractTaskLifecycleWork { + + private static final Logger LOGGER = LogManager.getLogger(); private final TaskProfile statistics; public TaskCompleteWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId, @@ -46,6 +51,7 @@ public class TaskCompleteWork extends AbstractTaskLifecycleWork { if (run == null) { return; } + LOGGER.debug("node completed task {}:{}:{}", nodeId, jobId, taId); if (statistics != null) { JobProfile jobProfile = run.getJobProfile(); Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles(); @@ -63,4 +69,9 @@ public class TaskCompleteWork extends AbstractTaskLifecycleWork { public String toString() { return getName() + ": [" + nodeId + "[" + jobId + ":" + taId + "]"; } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java index 833066ea4a..48fd403c56 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java @@ -22,17 +22,14 @@ import java.util.List; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.job.JobId; -import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.api.util.ErrorMessageUtil; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.cc.job.TaskAttempt; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public class TaskFailureWork extends AbstractTaskLifecycleWork { - private static final Logger LOGGER = LogManager.getLogger(); + private final List<Exception> exceptions; public TaskFailureWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId, @@ -43,9 +40,6 @@ public class TaskFailureWork extends AbstractTaskLifecycleWork { @Override protected void performEvent(TaskAttempt ta) { - Exception ex = exceptions.get(0); - LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN, - "Executing task failure work for " + this, ex); IJobManager jobManager = ccs.getJobManager(); JobRun run = jobManager.get(jobId); if (run == null) { @@ -57,6 +51,7 @@ public class TaskFailureWork extends AbstractTaskLifecycleWork { @Override public String toString() { - return getName() + ": [" + jobId + ":" + taId + ":" + nodeId + "]"; + return getName() + ": [" + jobId + ":" + taId + ":" + nodeId + "] " + + ErrorMessageUtil.getCauseMessage(exceptions.get(0)); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java index ed3e574323..63d5340176 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/WaitForJobCompletionWork.java @@ -29,6 +29,7 @@ import org.apache.hyracks.control.cc.job.IJobManager; import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.logging.log4j.Level; public class WaitForJobCompletionWork extends SynchronizableWork { private final ClusterControllerService ccs; @@ -92,4 +93,9 @@ public class WaitForJobCompletionWork extends SynchronizableWork { public String toString() { return getName() + " jobId:" + jobId; } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java index eee8950da4..e52e3acc66 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java @@ -52,7 +52,6 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition private boolean failed; protected boolean flushRequest; private boolean deallocated; - private Level openCloseLevel = Level.DEBUG; private Thread dataConsumerThread; public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid, @@ -181,9 +180,6 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition @Override public void open() throws HyracksDataException { - if (LOGGER.isEnabled(openCloseLevel)) { - LOGGER.log(openCloseLevel, "open(" + pid + " by " + taId); - } size = 0; eos = false; failed = false; @@ -215,9 +211,6 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition @Override public void close() throws HyracksDataException { - if (LOGGER.isEnabled(openCloseLevel)) { - LOGGER.log(openCloseLevel, "close(" + pid + " by " + taId); - } if (writeHandle != null) { ctx.getIoManager().close(writeHandle); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java index 6d4f1730b5..f386a89579 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java @@ -29,11 +29,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class ApplicationMessageWork extends AbstractWork { + private static final Logger LOGGER = LogManager.getLogger(); - private byte[] message; - private DeploymentId deploymentId; - private String nodeId; - private NodeControllerService ncs; + private final byte[] message; + private final DeploymentId deploymentId; + private final String nodeId; + private final NodeControllerService ncs; public ApplicationMessageWork(NodeControllerService ncs, byte[] message, DeploymentId deploymentId, String nodeId) { this.ncs = ncs; @@ -58,6 +59,11 @@ public class ApplicationMessageWork extends AbstractWork { } } + @Override + public Level logLevel() { + return Level.TRACE; + } + @Override public String toString() { return getName() + ": nodeId: " + nodeId; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java index 75edd38b00..2036d7218f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java @@ -25,10 +25,12 @@ import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.Joblet; import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class CleanupJobletWork extends AbstractWork { + private static final Logger LOGGER = LogManager.getLogger(); private final NodeControllerService ncs; @@ -45,7 +47,7 @@ public class CleanupJobletWork extends AbstractWork { @Override public void run() { - LOGGER.debug("cleaning up after job: {}", jobId); + LOGGER.debug("cleaning up {}, status:{}", jobId, status); ncs.removeJobParameterByteStore(jobId); ncs.getPartitionManager().jobCompleted(jobId, status); Map<JobId, Joblet> jobletMap = ncs.getJobletMap(); @@ -59,4 +61,9 @@ public class CleanupJobletWork extends AbstractWork { public String toString() { return getName() + " jobId:" + jobId + ", status:" + status; } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java index 60860c5afe..52469dc3b2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.control.nc.work; +import org.apache.hyracks.api.dataflow.TaskAttemptId; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; @@ -38,13 +40,16 @@ public class NotifyTaskCompleteWork extends AbstractWork { @Override public void run() { - TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile(), + JobId jobId = task.getJoblet().getJobId(); + TaskAttemptId taskAttemptId = task.getTaskAttemptId(); + LOGGER.debug("notifying CC of task complete {}:{}", jobId, taskAttemptId); + TaskProfile taskProfile = new TaskProfile(taskAttemptId, task.getPartitionSendProfile(), task.getStatsCollector(), task.getWarnings(), task.getWarningCollector().getTotalWarningsCount()); try { - ncs.getClusterController(task.getJobletContext().getJobId().getCcId()).notifyTaskComplete( - task.getJobletContext().getJobId(), task.getTaskAttemptId(), ncs.getId(), taskProfile); + ncs.getClusterController(task.getJobletContext().getJobId().getCcId()) + .notifyTaskComplete(task.getJobletContext().getJobId(), taskAttemptId, ncs.getId(), taskProfile); } catch (Exception e) { - LOGGER.log(Level.ERROR, "Failed notifying task complete for " + task.getTaskAttemptId(), e); + LOGGER.log(Level.ERROR, "Failed notifying task complete for {}", taskAttemptId, e); } task.getJoblet().removeTask(task); } @@ -54,4 +59,9 @@ public class NotifyTaskCompleteWork extends AbstractWork { return getName() + ": [" + ncs.getId() + "[" + task.getJoblet().getJobId() + ":" + task.getTaskAttemptId() + "]"; } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java index b0c60aa3d3..cd79da756c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.result.IResultPartitionManager; -import org.apache.hyracks.api.util.ExceptionUtils; +import org.apache.hyracks.api.util.ErrorMessageUtil; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.Task; @@ -50,9 +50,6 @@ public class NotifyTaskFailureWork extends AbstractWork { @Override public void run() { - Exception ex = exceptions.get(0); - LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : Level.WARN, "task " + taskId + " has failed", - ex); try { IResultPartitionManager resultPartitionManager = ncs.getResultPartitionManager(); if (resultPartitionManager != null) { @@ -69,6 +66,8 @@ public class NotifyTaskFailureWork extends AbstractWork { @Override public String toString() { - return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]"; + return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]" + + ((exceptions != null && !exceptions.isEmpty()) + ? " " + ErrorMessageUtil.getCauseMessage(exceptions.get(0)) : ""); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index f277046496..dd4a956bd3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -318,6 +318,6 @@ public class StartTasksWork extends AbstractWork { @Override public String toString() { - return getName() + " jobId:" + jobId; + return getName() + " jobId:" + jobId + " tasks:" + taskDescriptors.size(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index 555e8fbfa3..0f31491feb 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -319,9 +319,6 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD if (!failed) { state.hybridHJ.closeBuild(); ctx.setStateObject(state); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("OptimizedHybridHashJoin closed its build phase"); - } } else { state.hybridHJ.clearBuildTempFiles(); } @@ -402,10 +399,6 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD writer.open(); state.hybridHJ.initProbe(probComp); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase."); - } } @Override @@ -416,7 +409,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD @Override public void fail() throws HyracksDataException { failed = true; - if (state.hybridHJ != null) { + if (state != null && state.hybridHJ != null) { state.hybridHJ.fail(); } writer.fail(); @@ -427,12 +420,13 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD if (failed) { try { // Clear temp files if fail() was called. - state.hybridHJ.clearBuildTempFiles(); - state.hybridHJ.clearProbeTempFiles(); + if (state != null && state.hybridHJ != null) { + state.hybridHJ.clearBuildTempFiles(); + state.hybridHJ.clearProbeTempFiles(); + } } finally { writer.close(); // writer should always be closed. } - logProbeComplete(); return; } try { @@ -477,17 +471,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD // Re-throw the whatever is caught. throw e; } finally { - try { - logProbeComplete(); - } finally { - writer.close(); - } - } - } - - private void logProbeComplete() { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("OptimizedHybridHashJoin closed its probe phase"); + writer.close(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java index 08f15b3669..a1704ec077 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java @@ -41,13 +41,9 @@ import org.apache.hyracks.dataflow.std.structures.IResetableComparable; import org.apache.hyracks.dataflow.std.structures.IResetableComparableFactory; import org.apache.hyracks.dataflow.std.structures.MaxHeap; import org.apache.hyracks.dataflow.std.structures.TuplePointer; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public class TupleSorterHeapSort implements ITupleSorter { - private static final Logger LOGGER = LogManager.getLogger(); - class HeapEntryFactory implements IResetableComparableFactory<HeapEntry> { @Override public IResetableComparable<HeapEntry> createResetableComparable() { @@ -288,7 +284,6 @@ public class TupleSorterHeapSort implements ITupleSorter { int maxFrameSize = outputFrame.getFrameSize(); int numEntries = heap.getNumEntries(); IResetableComparable[] entries = heap.getEntries(); - int io = 0; for (int i = 0; i < numEntries; i++) { HeapEntry minEntry = (HeapEntry) entries[i]; bufferAccessor1.reset(minEntry.tuplePointer); @@ -296,14 +291,10 @@ public class TupleSorterHeapSort implements ITupleSorter { bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength()); if (flushed > 0) { maxFrameSize = Math.max(maxFrameSize, flushed); - io++; } } maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize()); outputAppender.write(writer, true); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames"); - } return maxFrameSize; } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index be22b9ca0d..7a75a0fcca 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -34,7 +34,9 @@ import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.ClusterCapacity; import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; import org.apache.hyracks.api.result.IResultSet; import org.apache.hyracks.api.result.IResultSetReader; import org.apache.hyracks.client.result.ResultSet; @@ -254,6 +256,14 @@ public abstract class AbstractMultiNCIntegrationTest { public void release(JobSpecification job) { } + + @Override + public IReadOnlyClusterCapacity getClusterCapacity() { + ClusterCapacity clusterCapacity = new ClusterCapacity(); + clusterCapacity.setAggregatedMemoryByteSize(maxRAM); + clusterCapacity.setAggregatedCores(Integer.MAX_VALUE); + return clusterCapacity; + } }; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java index 1ff9fa8b25..842ec61527 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java @@ -34,7 +34,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -80,11 +79,7 @@ public class ComponentUtils { */ public static void get(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { - boolean loggable = LOGGER.isDebugEnabled(); value.reset(); - if (loggable) { - LOGGER.log(Level.DEBUG, "Getting " + key + " from index " + index); - } // Lock the opTracker to ensure index components don't change synchronized (index.getOperationTracker()) { ILSMMemoryComponent cmc = index.getCurrentMemoryComponent(); @@ -92,33 +87,17 @@ public class ComponentUtils { index.getCurrentMemoryComponent().getMetadata().get(key, value); } if (value.getLength() == 0) { - if (loggable) { - LOGGER.log(Level.DEBUG, key + " was not found in mutable memory component of " + index); - } - // was not found in the in current mutable component, search in the other in memory components + // was not found in the in current mutable component, search in the other in-memory components fromImmutableMemoryComponents(index, key, value); if (value.getLength() == 0) { - if (loggable) { - LOGGER.log(Level.DEBUG, key + " was not found in all immmutable memory components of " + index); - } - // was not found in the in all in memory components, search in the disk components + // was not found in all in-memory components, search in the disk components fromDiskComponents(index, key, value); - if (loggable) { - if (value.getLength() == 0) { - LOGGER.log(Level.DEBUG, key + " was not found in all disk components of " + index); - } else { - LOGGER.log(Level.DEBUG, key + " was found in disk components of " + index); - } - } - } else { - if (loggable) { - LOGGER.log(Level.DEBUG, key + " was found in the immutable memory components of " + index); + if (value.getLength() == 0) { + LOGGER.debug("{} was NOT found", key); } } } else { - if (loggable) { - LOGGER.log(Level.DEBUG, key + " was found in mutable memory component of " + index); - } + LOGGER.debug("{} was found in mutable memory component {}", key, cmc); } } } @@ -143,17 +122,11 @@ public class ComponentUtils { private static void fromDiskComponents(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { - boolean loggable = LOGGER.isDebugEnabled(); - if (loggable) { - LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components of " + index); - } for (ILSMDiskComponent c : index.getDiskComponents()) { - if (loggable) { - LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components " + c); - } c.getMetadata().get(key, value); if (value.getLength() != 0) { // Found + LOGGER.debug("{} was found in disk component {}", key, c); return; } } @@ -161,21 +134,10 @@ public class ComponentUtils { private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { - boolean loggable = LOGGER.isDebugEnabled(); - if (loggable) { - LOGGER.log(Level.DEBUG, "Getting " + key + " from immutable memory components of " + index); - } List<ILSMMemoryComponent> memComponents = index.getMemoryComponents(); int numOtherMemComponents = memComponents.size() - 1; int next = index.getCurrentMemoryComponentIndex(); - if (loggable) { - LOGGER.log(Level.DEBUG, index + " has " + numOtherMemComponents + " immutable memory components"); - } for (int i = 0; i < numOtherMemComponents; i++) { - if (loggable) { - LOGGER.log(Level.DEBUG, - "trying to get " + key + " from immutable memory components number: " + (i + 1)); - } next = next - 1; if (next < 0) { next = memComponents.size() - 1; @@ -185,6 +147,7 @@ public class ComponentUtils { c.getMetadata().get(key, value); if (value.getLength() != 0) { // Found + LOGGER.debug("{} was found in immutable memory component {}", key, c); return; } }
