This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit e0d0c951a8d3bb0d86e1deef34a9b1893f9352c2 Author: Ali Alsuliman <[email protected]> AuthorDate: Sat May 25 04:03:57 2024 +0300 [NO ISSUE][OTH] Logging enhancements + query job logging - user model changes: no - storage format changes: no - interface changes: yes Details: - Add context to cancel request/response. - Reduce some JobWork to TRACE. - Avoid NPE when closing the pipeline. - Add method to get Job Queue size. - Add method to IJobCapacityController to get cluster current capacity for logging. - Remove not useful logs. Change-Id: Iec2c7af1b14bf99b1d11b4178be66da860dfcbf4 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18318 Reviewed-by: Michael Blow <[email protected]> Tested-by: Ali Alsuliman <[email protected]> --- .../app/active/ActiveEntityEventsListener.java | 59 +++++++--------------- .../app/active/ActiveNotificationHandler.java | 8 ++- .../asterix/app/message/CancelQueryRequest.java | 5 ++ .../asterix/app/message/CancelQueryResponse.java | 4 ++ .../evaluators/functions/SleepDescriptor.java | 8 +-- .../job/resource/JobCapacityController.java | 5 ++ ...stractOneInputOneOutputOneFramePushRuntime.java | 2 +- .../job/resource/DefaultJobCapacityController.java | 10 ++++ .../api/job/resource/IJobCapacityController.java | 6 +++ .../apache/hyracks/control/cc/job/JobManager.java | 22 ++++++++ .../hyracks/control/cc/scheduler/FIFOJobQueue.java | 5 ++ .../hyracks/control/cc/scheduler/IJobQueue.java | 7 +++ .../control/cc/work/ApplicationMessageWork.java | 10 +++- .../cc/work/GetNodeControllersInfoWork.java | 8 ++- .../control/cc/work/RemoveDeadNodesWork.java | 2 +- .../MaterializingPipelinedPartition.java | 7 --- .../control/nc/work/ApplicationMessageWork.java | 13 +++-- .../control/nc/work/NotifyTaskFailureWork.java | 5 +- .../OptimizedHybridHashJoinOperatorDescriptor.java | 28 +++------- .../dataflow/std/sort/TupleSorterHeapSort.java | 9 ---- .../AbstractMultiNCIntegrationTest.java | 10 ++++ 21 files changed, 136 insertions(+), 97 deletions(-) diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 626b93828e..63d884660e 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -153,9 +153,8 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void notify(ActiveEvent event) { try { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "EventListener is notified."); - } + LOGGER.debug("CC handling event {}; state={}, prev state={}, suspended={}", event, state, prevState, + suspended); ActiveEvent.Kind eventKind = event.getEventKind(); switch (eventKind) { case JOB_CREATED: @@ -194,26 +193,21 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @SuppressWarnings("unchecked") protected void finish(ActiveEvent event) throws HyracksDataException { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Active job {} finished", jobId); - } JobId lastJobId = jobId; + Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); if (numRegistered != numDeRegistered) { LOGGER.log(Level.WARN, - "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId, - numRegistered, numDeRegistered); + "ingestion job {} finished with status={}, reported runtime registrations={}, deregistrations={}", + jobId, status, numRegistered, numDeRegistered); } jobId = null; - Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); JobStatus jobStatus = status.getLeft(); List<Exception> exceptions = status.getRight(); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus); - } + LOGGER.debug("ingestion job {} finished with status {}", lastJobId, jobStatus); if (!jobSuccessfullyTerminated(jobStatus)) { jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) : exceptions.get(0); - LOGGER.error("Active Job {} failed", lastJobId, jobFailure); + LOGGER.error("ingestion job {} failed", lastJobId, jobFailure); setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED); if (prevState == ActivityState.RUNNING) { @@ -371,16 +365,14 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void recover() { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Recover is called on {}", entityId); - } if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { - LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); + LOGGER.debug("recover is called on {} w/o recovery policy; setting to permanent failure", entityId); setState(ActivityState.STOPPED); } else { + LOGGER.debug("recover is called on {}", entityId); ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); setState(ActivityState.TEMPORARILY_FAILED); - LOGGER.log(level, "Recovery task has been submitted"); + LOGGER.debug("recovery task has been submitted"); rt = createRecoveryTask(); executor.submit(rt.recover()); } @@ -481,13 +473,9 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl try { Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); sendStopMessages(metadataProvider, timeout, unit); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Waiting for its state to become " + waitFor); - } + LOGGER.debug("waiting for {} to become {}", jobId, waitFor); subscriber.sync(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Disconnect has been completed " + waitFor); - } + LOGGER.debug("disconnect has been completed {}", waitFor); } catch (InterruptedException ie) { forceStop(subscriber, ie); Thread.currentThread().interrupt(); @@ -513,13 +501,8 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl ICCMessageBroker messageBroker = (ICCMessageBroker) applicationCtx.getServiceContext().getMessageBroker(); AlgebricksAbsolutePartitionConstraint runtimeLocations = getLocations(); int partition = 0; - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations); - } + LOGGER.log(Level.INFO, "Sending stop messages to {}", runtimeLocations); for (String location : runtimeLocations.getLocations()) { - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, "Sending to " + location); - } ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++); messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, runtimeId, new StopRuntimeParameters(timeout, unit)), location); @@ -581,14 +564,10 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl WaitForStateSubscriber subscriber; Future<Void> suspendTask; synchronized (this) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "suspending entity " + entityId); - LOGGER.log(level, "Waiting for ongoing activities"); - } + LOGGER.log(level, "{} Suspending entity {}", jobId, entityId); + LOGGER.log(level, "{} Waiting for ongoing activities", jobId); waitForNonTransitionState(); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Proceeding with suspension. Current state is " + state); - } + LOGGER.log(level, "{} Proceeding with suspension. Current state is {}", jobId, state); if (state == ActivityState.STOPPED) { suspended = true; return; @@ -609,12 +588,12 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl doSuspend(metadataProvider); return null; }); - LOGGER.log(level, "Suspension task has been submitted"); + LOGGER.log(level, "{} Suspension task has been submitted", jobId); } try { - LOGGER.log(level, "Waiting for suspension task to complete"); + LOGGER.log(level, "{} Waiting for suspension task to complete", jobId); suspendTask.get(); - LOGGER.log(level, "waiting for state to become SUSPENDED or TEMPORARILY_FAILED"); + LOGGER.log(level, "{} Waiting for state to become SUSPENDED or TEMPORARILY_FAILED", jobId); subscriber.sync(); suspended = true; } catch (Exception e) { diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 3c277d5aa6..1f3169895d 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -92,16 +92,14 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "notifyJobCreation was called for job {}", jobId); - } Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); if (!(property instanceof EntityId)) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property); + if (property != null) { + LOGGER.debug("{} is not an active job. job property={}", jobId, property); } return; } + LOGGER.debug("notified of ingestion job creation {}", jobId); EntityId entityId = (EntityId) property; monitorJob(jobId, entityId); boolean found = jobId2EntityId.get(jobId) != null; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java index 65d1039aff..04e661bf3a 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java @@ -82,4 +82,9 @@ public class CancelQueryRequest implements ICcAddressedMessage { } } + @Override + public String toString() { + return String.format("%s(id=%s, uuid=%s, contextId=%s, node=%s)", getClass().getSimpleName(), reqId, uuid, + contextId, nodeId); + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java index d65ae316ee..a711b732eb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java @@ -49,4 +49,8 @@ public class CancelQueryResponse implements INcAddressedMessage { return status; } + @Override + public String toString() { + return String.format("%s(id=%s, status=%s)", getClass().getSimpleName(), reqId, status); + } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java index ac87f7e264..ef348fe0a7 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java @@ -64,16 +64,16 @@ public class SleepDescriptor extends AbstractScalarFunctionDynamicDescriptor { final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset); try { - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, + if (LOGGER.isTraceEnabled()) { + LOGGER.log(Level.TRACE, ctx.getTaskContext().getTaskAttemptId() + " sleeping for " + time + " ms"); } Thread.sleep(time); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, + if (LOGGER.isTraceEnabled()) { + LOGGER.log(Level.TRACE, ctx.getTaskContext().getTaskAttemptId() + " done sleeping for " + time + " ms"); } } diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java index b123a5ed2e..ae903d1f71 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/job/resource/JobCapacityController.java @@ -77,6 +77,11 @@ public class JobCapacityController implements IJobCapacityController { ensureMaxCapacity(); } + @Override + public IReadOnlyClusterCapacity getClusterCapacity() { + return resourceManager.getCurrentCapacity(); + } + private void ensureMaxCapacity() { final IClusterCapacity currentCapacity = resourceManager.getCurrentCapacity(); final IReadOnlyClusterCapacity maximumCapacity = resourceManager.getMaximumCapacity(); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java index 9f4541f582..0c74260d6e 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/operators/base/AbstractOneInputOneOutputOneFramePushRuntime.java @@ -74,7 +74,7 @@ public abstract class AbstractOneInputOneOutputOneFramePushRuntime extends Abstr } protected void flushIfNotFailed() throws HyracksDataException { - if (!failed && appender.getTupleCount() > 0) { + if (!failed && appender != null && appender.getTupleCount() > 0) { flushAndReset(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java index 9e38a20991..b18bcb10ee 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/DefaultJobCapacityController.java @@ -24,6 +24,11 @@ import org.apache.hyracks.api.job.JobSpecification; public class DefaultJobCapacityController implements IJobCapacityController { public static final DefaultJobCapacityController INSTANCE = new DefaultJobCapacityController(); + private static final IClusterCapacity CAPACITY = new ClusterCapacity(); + static { + CAPACITY.setAggregatedCores(Integer.MAX_VALUE); + CAPACITY.setAggregatedMemoryByteSize(Long.MAX_VALUE); + } private DefaultJobCapacityController() { } @@ -37,4 +42,9 @@ public class DefaultJobCapacityController implements IJobCapacityController { public void release(JobSpecification job) { // No operation here. } + + @Override + public IReadOnlyClusterCapacity getClusterCapacity() { + return CAPACITY; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java index 5fa4bd9d83..f88baa2ee9 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/resource/IJobCapacityController.java @@ -57,4 +57,10 @@ public interface IJobCapacityController { */ void release(JobSpecification job); + /** + * The cluster current capacity. + * + * @return the cluster current capacity. + */ + IReadOnlyClusterCapacity getClusterCapacity(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java index 33bdd0558a..234744933a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java @@ -38,7 +38,9 @@ import org.apache.hyracks.api.job.ActivityClusterGraph; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.IClusterCapacity; import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; import org.apache.hyracks.api.util.ExceptionUtils; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.cc.NodeControllerState; @@ -312,6 +314,7 @@ public class JobManager implements IJobManager { run.setStartTime(System.currentTimeMillis()); run.setStartTimeZoneId(ZoneId.systemDefault().getId()); JobId jobId = run.getJobId(); + logJobCapacity(run, "running"); activeRunMap.put(jobId, run); run.setStatus(JobStatus.RUNNING, null); executeJobInternal(run); @@ -319,6 +322,7 @@ public class JobManager implements IJobManager { // Queue a job when the required capacity for the job is not met. private void queueJob(JobRun jobRun) throws HyracksException { + logJobCapacity(jobRun, "queueing"); jobRun.setStatus(JobStatus.PENDING, null); jobQueue.add(jobRun); } @@ -354,5 +358,23 @@ public class JobManager implements IJobManager { private void releaseJobCapacity(JobRun jobRun) { final JobSpecification job = jobRun.getJobSpecification(); jobCapacityController.release(job); + logJobCapacity(jobRun, "released"); + } + + private void logJobCapacity(JobRun jobRun, String jobStateDesc) { + IClusterCapacity requiredResources = jobRun.getJobSpecification().getRequiredClusterCapacity(); + if (requiredResources == null) { + return; + } + long requiredMemory = requiredResources.getAggregatedMemoryByteSize(); + int requiredCPUs = requiredResources.getAggregatedCores(); + if (requiredMemory == 0 && requiredCPUs == 0) { + return; + } + IReadOnlyClusterCapacity clusterCapacity = jobCapacityController.getClusterCapacity(); + LOGGER.info("{} {}, memory={}, cpu={}, (new) cluster memory={}, cpu={}, currently running={}, queued={}", + jobStateDesc, jobRun.getJobId(), requiredMemory, requiredCPUs, + clusterCapacity.getAggregatedMemoryByteSize(), clusterCapacity.getAggregatedCores(), + getRunningJobsCount(), jobQueue.size()); } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java index 260c6b94eb..38277c2af7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/FIFOJobQueue.java @@ -121,4 +121,9 @@ public class FIFOJobQueue implements IJobQueue { public void clear() { jobListMap.clear(); } + + @Override + public int size() { + return jobListMap.size(); + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java index be40883f60..1f2c29a3cb 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/scheduler/IJobQueue.java @@ -73,4 +73,11 @@ public interface IJobQueue { * Clears the job queue */ void clear(); + + /** + * Returns the number of queued jobs. + * + * @return the number of queued jobs. + */ + int size(); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java index 771832eeea..4f77914b2f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.messages.IMessage; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.deployment.DeploymentUtils; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,8 +35,8 @@ import org.apache.logging.log4j.Logger; public class ApplicationMessageWork extends AbstractHeartbeatWork { private static final Logger LOGGER = LogManager.getLogger(); - private byte[] message; - private DeploymentId deploymentId; + private final byte[] message; + private final DeploymentId deploymentId; public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId, String nodeId) { @@ -61,6 +62,11 @@ public class ApplicationMessageWork extends AbstractHeartbeatWork { return getName() + ": nodeID: " + nodeId; } + @Override + public Level logLevel() { + return Level.TRACE; + } + private static void notifyMessageBroker(ICCServiceContext ctx, IMessage msg, String nodeId) { final ExecutorService executor = ctx.getControllerService().getExecutor(); executor.execute(() -> { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java index c36b887c28..73c0c7d86a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java @@ -24,10 +24,11 @@ import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.logging.log4j.Level; public class GetNodeControllersInfoWork extends AbstractWork { private final INodeManager nodeManager; - private IResultCallback<Map<String, NodeControllerInfo>> callback; + private final IResultCallback<Map<String, NodeControllerInfo>> callback; public GetNodeControllersInfoWork(INodeManager nodeManager, IResultCallback<Map<String, NodeControllerInfo>> callback) { @@ -39,4 +40,9 @@ public class GetNodeControllersInfoWork extends AbstractWork { public void run() { callback.setValue(nodeManager.getNodeControllerInfoMap()); } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java index ee10669952..cbdf98a2e2 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java @@ -71,6 +71,6 @@ public class RemoveDeadNodesWork extends AbstractWork { @Override public Level logLevel() { - return Level.DEBUG; + return Level.TRACE; } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java index eee8950da4..e52e3acc66 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/partitions/MaterializingPipelinedPartition.java @@ -52,7 +52,6 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition private boolean failed; protected boolean flushRequest; private boolean deallocated; - private Level openCloseLevel = Level.DEBUG; private Thread dataConsumerThread; public MaterializingPipelinedPartition(IHyracksTaskContext ctx, PartitionManager manager, PartitionId pid, @@ -181,9 +180,6 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition @Override public void open() throws HyracksDataException { - if (LOGGER.isEnabled(openCloseLevel)) { - LOGGER.log(openCloseLevel, "open(" + pid + " by " + taId); - } size = 0; eos = false; failed = false; @@ -215,9 +211,6 @@ public class MaterializingPipelinedPartition implements IFrameWriter, IPartition @Override public void close() throws HyracksDataException { - if (LOGGER.isEnabled(openCloseLevel)) { - LOGGER.log(openCloseLevel, "close(" + pid + " by " + taId); - } if (writeHandle != null) { ctx.getIoManager().close(writeHandle); } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java index 6d4f1730b5..d1360d84d0 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java @@ -30,10 +30,10 @@ import org.apache.logging.log4j.Logger; public class ApplicationMessageWork extends AbstractWork { private static final Logger LOGGER = LogManager.getLogger(); - private byte[] message; - private DeploymentId deploymentId; - private String nodeId; - private NodeControllerService ncs; + private final byte[] message; + private final DeploymentId deploymentId; + private final String nodeId; + private final NodeControllerService ncs; public ApplicationMessageWork(NodeControllerService ncs, byte[] message, DeploymentId deploymentId, String nodeId) { this.ncs = ncs; @@ -62,4 +62,9 @@ public class ApplicationMessageWork extends AbstractWork { public String toString() { return getName() + ": nodeId: " + nodeId; } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java index 6dd430720a..cd79da756c 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.result.IResultPartitionManager; +import org.apache.hyracks.api.util.ErrorMessageUtil; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; import org.apache.hyracks.control.nc.Task; @@ -65,6 +66,8 @@ public class NotifyTaskFailureWork extends AbstractWork { @Override public String toString() { - return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]"; + return getName() + ": [" + ncs.getId() + "[" + jobId + ":" + taskId + "]" + + ((exceptions != null && !exceptions.isEmpty()) + ? " " + ErrorMessageUtil.getCauseMessage(exceptions.get(0)) : ""); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java index 555e8fbfa3..0f31491feb 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java @@ -319,9 +319,6 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD if (!failed) { state.hybridHJ.closeBuild(); ctx.setStateObject(state); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("OptimizedHybridHashJoin closed its build phase"); - } } else { state.hybridHJ.clearBuildTempFiles(); } @@ -402,10 +399,6 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD writer.open(); state.hybridHJ.initProbe(probComp); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("OptimizedHybridHashJoin is starting the probe phase."); - } } @Override @@ -416,7 +409,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD @Override public void fail() throws HyracksDataException { failed = true; - if (state.hybridHJ != null) { + if (state != null && state.hybridHJ != null) { state.hybridHJ.fail(); } writer.fail(); @@ -427,12 +420,13 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD if (failed) { try { // Clear temp files if fail() was called. - state.hybridHJ.clearBuildTempFiles(); - state.hybridHJ.clearProbeTempFiles(); + if (state != null && state.hybridHJ != null) { + state.hybridHJ.clearBuildTempFiles(); + state.hybridHJ.clearProbeTempFiles(); + } } finally { writer.close(); // writer should always be closed. } - logProbeComplete(); return; } try { @@ -477,17 +471,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor extends AbstractOperatorD // Re-throw the whatever is caught. throw e; } finally { - try { - logProbeComplete(); - } finally { - writer.close(); - } - } - } - - private void logProbeComplete() { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("OptimizedHybridHashJoin closed its probe phase"); + writer.close(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java index 08f15b3669..a1704ec077 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/sort/TupleSorterHeapSort.java @@ -41,13 +41,9 @@ import org.apache.hyracks.dataflow.std.structures.IResetableComparable; import org.apache.hyracks.dataflow.std.structures.IResetableComparableFactory; import org.apache.hyracks.dataflow.std.structures.MaxHeap; import org.apache.hyracks.dataflow.std.structures.TuplePointer; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public class TupleSorterHeapSort implements ITupleSorter { - private static final Logger LOGGER = LogManager.getLogger(); - class HeapEntryFactory implements IResetableComparableFactory<HeapEntry> { @Override public IResetableComparable<HeapEntry> createResetableComparable() { @@ -288,7 +284,6 @@ public class TupleSorterHeapSort implements ITupleSorter { int maxFrameSize = outputFrame.getFrameSize(); int numEntries = heap.getNumEntries(); IResetableComparable[] entries = heap.getEntries(); - int io = 0; for (int i = 0; i < numEntries; i++) { HeapEntry minEntry = (HeapEntry) entries[i]; bufferAccessor1.reset(minEntry.tuplePointer); @@ -296,14 +291,10 @@ public class TupleSorterHeapSort implements ITupleSorter { bufferAccessor1.getTupleStartOffset(), bufferAccessor1.getTupleLength()); if (flushed > 0) { maxFrameSize = Math.max(maxFrameSize, flushed); - io++; } } maxFrameSize = Math.max(maxFrameSize, outputFrame.getFrameSize()); outputAppender.write(writer, true); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Flushed records:" + numEntries + "; Flushed through " + (io + 1) + " frames"); - } return maxFrameSize; } diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java index be22b9ca0d..7a75a0fcca 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AbstractMultiNCIntegrationTest.java @@ -34,7 +34,9 @@ import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.job.JobStatus; +import org.apache.hyracks.api.job.resource.ClusterCapacity; import org.apache.hyracks.api.job.resource.IJobCapacityController; +import org.apache.hyracks.api.job.resource.IReadOnlyClusterCapacity; import org.apache.hyracks.api.result.IResultSet; import org.apache.hyracks.api.result.IResultSetReader; import org.apache.hyracks.client.result.ResultSet; @@ -254,6 +256,14 @@ public abstract class AbstractMultiNCIntegrationTest { public void release(JobSpecification job) { } + + @Override + public IReadOnlyClusterCapacity getClusterCapacity() { + ClusterCapacity clusterCapacity = new ClusterCapacity(); + clusterCapacity.setAggregatedMemoryByteSize(maxRAM); + clusterCapacity.setAggregatedCores(Integer.MAX_VALUE); + return clusterCapacity; + } }; } }
