This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 1f82cd32338b0bcd08ab567c84d2d34edf43f402 Author: Ali Alsuliman <[email protected]> AuthorDate: Sun Dec 17 16:19:37 2023 -0800 [NO ISSUE][OTH] Improve logging - user model changes: no - storage format changes: no - interface changes: no Details: - Add description to Active generic messages. - Include uuid, context_id in request messages. - Make some *Work logging at TRACE. Change-Id: Ic6c335067fbd996df9bba51028858322158d598d Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18016 Reviewed-by: Ali Alsuliman <[email protected]> Reviewed-by: Wail Alkowaileet <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> --- .../org/apache/asterix/active/ActiveEvent.java | 8 ++- .../org/apache/asterix/active/ActiveManager.java | 6 +- .../active/ActiveSourceOperatorNodePushable.java | 21 ++++-- .../active/message/ActiveManagerMessage.java | 15 +++-- .../active/message/ActivePartitionMessage.java | 14 +++- .../active/message/ActiveStatsRequestMessage.java | 2 +- .../api/http/server/RebalanceApiServlet.java | 2 +- .../app/active/ActiveEntityEventsListener.java | 49 +++++--------- .../app/active/ActiveNotificationHandler.java | 77 +++++++--------------- .../asterix/app/message/CancelQueryRequest.java | 11 ++-- .../asterix/app/message/CancelQueryResponse.java | 4 ++ .../message/ExecuteStatementRequestMessage.java | 8 ++- .../message/ExecuteStatementResponseMessage.java | 13 ++-- .../message/RegistrationTasksResponseMessage.java | 5 ++ .../org/apache/asterix/common/TestDataUtil.java | 2 +- .../asterix/test/active/ActiveStatsTest.java | 2 +- .../asterix/test/active/RuntimeRegistration.java | 2 +- .../test/active/TestNodeControllerActor.java | 2 +- .../apache/asterix/test/active/TestUserActor.java | 2 +- .../common/context/DatasetLifecycleManager.java | 2 +- .../ioopcallbacks/LSMIOOperationCallback.java | 5 +- .../src/main/resources/asx_errormsg/en.properties | 4 +- .../dataflow/FeedRecordDataFlowController.java | 14 +++- .../src/main/resources/errormsg/en.properties | 2 +- .../control/cc/work/ApplicationMessageWork.java | 10 ++- .../cc/work/GetNodeControllersInfoWork.java | 9 ++- .../cc/work/GetResultDirectoryAddressWork.java | 7 ++ .../hyracks/control/cc/work/JobCleanupWork.java | 17 +++-- .../hyracks/control/cc/work/JobStartWork.java | 11 ++++ .../cc/work/JobletCleanupNotificationWork.java | 7 ++ .../hyracks/control/cc/work/RegisterNodeWork.java | 6 ++ .../control/cc/work/RemoveDeadNodesWork.java | 4 +- .../hyracks/control/cc/work/TaskCompleteWork.java | 12 ++++ .../control/nc/work/ApplicationMessageWork.java | 14 ++-- .../hyracks/control/nc/work/CleanupJobletWork.java | 9 ++- .../control/nc/work/NotifyTaskCompleteWork.java | 21 ++++-- .../hyracks/control/nc/work/StartTasksWork.java | 2 +- .../am/common/dataflow/IndexDataflowHelper.java | 3 +- .../am/common/dataflow/IndexLifecycleManager.java | 2 +- .../lsm/common/impls/AbstractLSMDiskComponent.java | 9 ++- .../am/lsm/common/impls/AbstractLSMIndex.java | 10 ++- .../common/impls/AbstractLSMMemoryComponent.java | 6 +- .../am/lsm/common/impls/EmptyComponent.java | 5 ++ .../storage/am/lsm/common/impls/LSMHarness.java | 24 +++---- .../storage/am/lsm/common/util/ComponentUtils.java | 51 ++------------ 45 files changed, 278 insertions(+), 233 deletions(-) diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java index d6d9b9650f..279bba1214 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveEvent.java @@ -20,6 +20,7 @@ package org.apache.asterix.active; import java.util.Objects; +import org.apache.asterix.active.message.ActivePartitionMessage; import org.apache.hyracks.api.job.JobId; public class ActiveEvent { @@ -65,7 +66,12 @@ public class ActiveEvent { @Override public String toString() { - return "JobId:" + jobId + "," + "EntityId:" + entityId + ", " + "Kind" + eventKind; + String kindDesc = ""; + if (eventObject instanceof ActivePartitionMessage) { + ActivePartitionMessage partitionEvent = (ActivePartitionMessage) eventObject; + kindDesc = '-' + String.valueOf(partitionEvent.getEvent()) + '(' + partitionEvent.getDesc() + ')'; + } + return jobId + ", " + "EntityId:" + entityId + ", " + "Kind:" + eventKind + kindDesc; } @Override diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java index b99e4f2aec..636279cb09 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveManager.java @@ -103,8 +103,8 @@ public class ActiveManager { return ActiveManager.class.getSimpleName() + "[" + nodeId + "]"; } - public void submit(ActiveManagerMessage message) throws HyracksDataException { - LOGGER.debug("Message of type {} received in {}", message.getKind(), nodeId); + public void handle(ActiveManagerMessage message) throws HyracksDataException { + LOGGER.debug("NC handling {}({})({})", message.getKind(), message.getRuntimeId(), message.getDesc()); switch (message.getKind()) { case STOP_ACTIVITY: stopRuntime(message); @@ -125,7 +125,7 @@ public class ActiveManager { ActiveRuntimeId runtimeId = message.getRuntimeId(); IActiveRuntime runtime = runtimes.get(runtimeId); if (runtime == null) { - LOGGER.warn("Request for a runtime {} that is not registered {}", runtimeId, message); + LOGGER.warn("not handling {} for a runtime {} that is not registered", message, runtimeId); return; } runtime.handleGenericEvent(message); diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java index 453ffa0afa..9123503590 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/ActiveSourceOperatorNodePushable.java @@ -27,7 +27,6 @@ import org.apache.hyracks.api.comm.IFrameWriter; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -98,14 +97,14 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp try { // notify cc that runtime has been registered ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - Event.RUNTIME_REGISTERED, null), null); + Event.RUNTIME_REGISTERED, null, ""), null); start(); } catch (InterruptedException e) { - LOGGER.log(Level.INFO, "initialize() interrupted on ActiveSourceOperatorNodePushable", e); + LOGGER.info("ingestion op interrupted", e); Thread.currentThread().interrupt(); throw HyracksDataException.create(e); } catch (Exception e) { - LOGGER.log(Level.INFO, "initialize() failed on ActiveSourceOperatorNodePushable", e); + logIngestionFailure(e); throw HyracksDataException.create(e); } finally { synchronized (this) { @@ -121,12 +120,12 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp activeManager.deregisterRuntime(runtimeId); try { ctx.sendApplicationMessageToCC(new ActivePartitionMessage(runtimeId, ctx.getJobletContext().getJobId(), - Event.RUNTIME_DEREGISTERED, null), null); + Event.RUNTIME_DEREGISTERED, null, ""), null); } catch (Exception e) { - LOGGER.log(Level.INFO, "deinitialize() failed on ActiveSourceOperatorNodePushable", e); + LOGGER.info("ingestion op stopped w/ failure", e); throw HyracksDataException.create(e); } finally { - LOGGER.log(Level.INFO, "deinitialize() returning on ActiveSourceOperatorNodePushable"); + LOGGER.info("ingestion op stopped"); } } @@ -134,4 +133,12 @@ public abstract class ActiveSourceOperatorNodePushable extends AbstractUnaryOutp public final IFrameWriter getInputFrameWriter(int index) { return null; } + + private void logIngestionFailure(Exception e) { + if (e.getCause() instanceof InterruptedException) { + LOGGER.info("ingestion op interrupted", e); + } else { + LOGGER.info("ingestion op failed", e); + } + } } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java index bad3f79f3d..692bbb9800 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveManagerMessage.java @@ -34,15 +34,17 @@ public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddr GENERIC_EVENT } - private static final long serialVersionUID = 2L; + private static final long serialVersionUID = 3L; private final Kind kind; private final ActiveRuntimeId runtimeId; private final Serializable payload; + private final String desc; - public ActiveManagerMessage(Kind kind, ActiveRuntimeId runtimeId, Serializable payload) { + public ActiveManagerMessage(Kind kind, ActiveRuntimeId runtimeId, Serializable payload, String desc) { this.kind = kind; this.runtimeId = runtimeId; this.payload = payload; + this.desc = desc; } public Serializable getPayload() { @@ -57,13 +59,18 @@ public class ActiveManagerMessage extends CcIdentifiedMessage implements INcAddr return kind; } + public String getDesc() { + return desc; + } + @Override public void handle(INcApplicationContext appCtx) throws HyracksDataException, InterruptedException { - ((ActiveManager) appCtx.getActiveManager()).submit(this); + ((ActiveManager) appCtx.getActiveManager()).handle(this); } @Override public String toString() { - return getClass().getSimpleName() + "{" + "kind=" + kind + ", runtimeId=" + runtimeId + '}'; + return getClass().getSimpleName() + "{kind=" + kind + ", runtimeId=" + runtimeId + + (desc != null && !desc.isEmpty() ? ", desc=" + desc : "") + '}'; } } diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java index cb9c61b2e9..2955271afc 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActivePartitionMessage.java @@ -29,23 +29,27 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; public class ActivePartitionMessage implements ICcAddressedMessage { + public enum Event { RUNTIME_REGISTERED, RUNTIME_DEREGISTERED, GENERIC_EVENT } - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; private final ActiveRuntimeId activeRuntimeId; private final JobId jobId; private final Serializable payload; + private final String desc; private final Event event; - public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, Event event, Serializable payload) { + public ActivePartitionMessage(ActiveRuntimeId activeRuntimeId, JobId jobId, Event event, Serializable payload, + String desc) { this.activeRuntimeId = activeRuntimeId; this.jobId = jobId; this.event = event; this.payload = payload; + this.desc = desc; } public ActiveRuntimeId getActiveRuntimeId() { @@ -64,6 +68,10 @@ public class ActivePartitionMessage implements ICcAddressedMessage { return event; } + public String getDesc() { + return desc; + } + @Override public void handle(ICcApplicationContext appCtx) throws HyracksDataException, InterruptedException { IActiveNotificationHandler activeListener = (IActiveNotificationHandler) appCtx.getActiveNotificationHandler(); @@ -72,7 +80,7 @@ public class ActivePartitionMessage implements ICcAddressedMessage { @Override public String toString() { - return activeRuntimeId + ":" + ActivePartitionMessage.class.getSimpleName() + '-' + event; + return activeRuntimeId + ":" + ActivePartitionMessage.class.getSimpleName() + '-' + event + '(' + desc + ')'; } @Override diff --git a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java index 94668a0a3e..2e5a57186f 100644 --- a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java +++ b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/message/ActiveStatsRequestMessage.java @@ -25,7 +25,7 @@ public class ActiveStatsRequestMessage extends ActiveManagerMessage { private final long reqId; public ActiveStatsRequestMessage(ActiveRuntimeId runtimeId, long reqId) { - super(Kind.REQUEST_STATS, runtimeId, null); + super(Kind.REQUEST_STATS, runtimeId, null, ""); this.reqId = reqId; } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java index 58a37bd665..021c3fb5c9 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/RebalanceApiServlet.java @@ -261,7 +261,7 @@ public class RebalanceApiServlet extends AbstractServlet { try { ActiveNotificationHandler activeNotificationHandler = (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); - activeNotificationHandler.suspend(metadataProvider); + activeNotificationHandler.suspend(metadataProvider, "rebalance api"); try { IMetadataLockManager lockManager = appCtx.getMetadataLockManager(); lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), dataverseName, diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java index 626b93828e..0240518877 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveEntityEventsListener.java @@ -138,7 +138,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl } protected synchronized void setState(ActivityState newState) { - LOGGER.log(level, "State of {} is being set to {} from {}", getEntityId(), newState, state); + LOGGER.log(level, "state of {} is being set from {} to {}", getEntityId(), state, newState); this.prevState = state; this.state = newState; if (newState == ActivityState.STARTING || newState == ActivityState.RECOVERING @@ -153,9 +153,8 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void notify(ActiveEvent event) { try { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "EventListener is notified."); - } + LOGGER.debug("CC handling event {}; state={}, prev state={}, suspended={}", event, state, prevState, + suspended); ActiveEvent.Kind eventKind = event.getEventKind(); switch (eventKind) { case JOB_CREATED: @@ -194,26 +193,21 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @SuppressWarnings("unchecked") protected void finish(ActiveEvent event) throws HyracksDataException { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Active job {} finished", jobId); - } JobId lastJobId = jobId; + Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); if (numRegistered != numDeRegistered) { LOGGER.log(Level.WARN, - "Active job {} finished with reported runtime registrations = {} and deregistrations = {}", jobId, - numRegistered, numDeRegistered); + "ingestion job {} finished with status={}, reported runtime registrations={}, deregistrations={}", + jobId, status, numRegistered, numDeRegistered); } jobId = null; - Pair<JobStatus, List<Exception>> status = (Pair<JobStatus, List<Exception>>) event.getEventObject(); JobStatus jobStatus = status.getLeft(); List<Exception> exceptions = status.getRight(); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Active job {} finished with status {}", lastJobId, jobStatus); - } + LOGGER.debug("ingestion job {} finished with status {}", lastJobId, jobStatus); if (!jobSuccessfullyTerminated(jobStatus)) { jobFailure = exceptions.isEmpty() ? new RuntimeDataException(ErrorCode.UNREPORTED_TASK_FAILURE_EXCEPTION) : exceptions.get(0); - LOGGER.error("Active Job {} failed", lastJobId, jobFailure); + LOGGER.error("ingestion job {} failed", lastJobId, jobFailure); setState((state == ActivityState.STOPPING || state == ActivityState.CANCELLING) ? ActivityState.STOPPED : ActivityState.TEMPORARILY_FAILED); if (prevState == ActivityState.RUNNING) { @@ -371,16 +365,14 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public synchronized void recover() { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Recover is called on {}", entityId); - } if (retryPolicyFactory == NoRetryPolicyFactory.INSTANCE) { - LOGGER.log(level, "But it has no recovery policy, so it is set to permanent failure"); + LOGGER.debug("recover is called on {} w/o recovery policy; setting to permanent failure", entityId); setState(ActivityState.STOPPED); } else { + LOGGER.debug("recover is called on {}", entityId); ExecutorService executor = appCtx.getServiceContext().getControllerService().getExecutor(); setState(ActivityState.TEMPORARILY_FAILED); - LOGGER.log(level, "Recovery task has been submitted"); + LOGGER.debug("recovery task has been submitted"); rt = createRecoveryTask(); executor.submit(rt.recover()); } @@ -479,15 +471,11 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl // Note: once we start sending stop messages, we can't go back until the entity is stopped final String nameBefore = Thread.currentThread().getName(); try { - Thread.currentThread().setName(nameBefore + " : WaitForCompletionForJobId: " + jobId); + Thread.currentThread().setName(nameBefore + " : wait-for-ingestion-completion: " + jobId); sendStopMessages(metadataProvider, timeout, unit); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Waiting for its state to become " + waitFor); - } + LOGGER.debug("waiting for {} to become {}", jobId, waitFor); subscriber.sync(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Disconnect has been completed " + waitFor); - } + LOGGER.debug("disconnect has been completed {}", waitFor); } catch (InterruptedException ie) { forceStop(subscriber, ie); Thread.currentThread().interrupt(); @@ -517,12 +505,9 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl LOGGER.log(Level.INFO, "Sending stop messages to " + runtimeLocations); } for (String location : runtimeLocations.getLocations()) { - if (LOGGER.isInfoEnabled()) { - LOGGER.log(Level.INFO, "Sending to " + location); - } ActiveRuntimeId runtimeId = getActiveRuntimeId(partition++); messageBroker.sendApplicationMessageToNC(new ActiveManagerMessage(ActiveManagerMessage.Kind.STOP_ACTIVITY, - runtimeId, new StopRuntimeParameters(timeout, unit)), location); + runtimeId, new StopRuntimeParameters(timeout, unit), ""), location); } } @@ -736,7 +721,7 @@ public abstract class ActiveEntityEventsListener implements IActiveEntityControl @Override public String toString() { - return "{\"class\":\"" + getClass().getSimpleName() + "\"," + "\"entityId\":\"" + entityId + "\"," - + "\"state\":\"" + state + "\"" + "}"; + return "{\"class\":\"" + getClass().getSimpleName() + "\", \"entityId\":\"" + entityId + "\", \"state\":\"" + + state + "\", \"prev state\":\"" + prevState + "\", \"suspended\":" + suspended + "}"; } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java index 3c277d5aa6..8821c674ec 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java @@ -72,19 +72,17 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active EntityId entityId = jobId2EntityId.get(jobId); if (entityId != null) { IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Next event is {} for job {}", eventKind, jobId); - } if (eventKind == Kind.JOB_FINISHED) { - LOGGER.log(level, "Removing job {}", jobId); + LOGGER.debug("removing ingestion job {}", jobId); jobId2EntityId.remove(jobId); } if (listener != null) { - LOGGER.log(level, "Notifying the listener"); listener.notify(event); + } else { + LOGGER.debug("listener not found for entity {} on event={}", entityId, event); } } else { - LOGGER.log(Level.ERROR, "Entity not found for event {} for job {}", eventKind, jobId); + LOGGER.error("entity not found for event {}", event); } } @@ -92,45 +90,30 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public void notifyJobCreation(JobId jobId, JobSpecification jobSpecification) throws HyracksDataException { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "notifyJobCreation was called for job {}", jobId); - } Object property = jobSpecification.getProperty(ACTIVE_ENTITY_PROPERTY_NAME); if (!(property instanceof EntityId)) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Job {} is not of type active job. property found to be {}", jobId, property); + if (property != null) { + LOGGER.debug("{} is not an active job. job property={}", jobId, property); } return; } + LOGGER.debug("notified of ingestion job creation {}", jobId); EntityId entityId = (EntityId) property; monitorJob(jobId, entityId); - boolean found = jobId2EntityId.get(jobId) != null; - LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive")); add(new ActiveEvent(jobId, Kind.JOB_CREATED, entityId, jobSpecification)); } private synchronized void monitorJob(JobId jobId, EntityId entityId) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "monitorJob was called for job {}", jobId); - } - boolean found = jobId2EntityId.get(jobId) != null; - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Job {} was found to be {}", jobId, (found ? "Active" : "Inactive")); - } + boolean found = jobId2EntityId.containsKey(jobId); + LOGGER.debug("{} is {}", jobId, (found ? "active" : "inactive")); if (entityEventListeners.containsKey(entityId)) { - if (jobId2EntityId.containsKey(jobId)) { - if (LOGGER.isErrorEnabled()) { - LOGGER.error("Job {} is already being monitored", jobId); - } + if (found) { + LOGGER.error("{} is already being monitored", jobId); return; } - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Monitoring started for job {}", jobId); - } + LOGGER.debug("monitoring started for {}", jobId); } else { - if (LOGGER.isEnabled(level)) { - LOGGER.info("No listener was found for the entity {} for job {}", entityId, jobId); - } + LOGGER.debug("no listener found for entity {}; {}", entityId, jobId); } jobId2EntityId.put(jobId, entityId); } @@ -141,22 +124,18 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (entityId != null) { add(new ActiveEvent(jobId, Kind.JOB_STARTED, entityId, null)); } + // else must be non-active job, e.g. a job for a query } @Override public synchronized void notifyJobFinish(JobId jobId, JobStatus jobStatus, List<Exception> exceptions) throws HyracksException { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Getting notified of job finish for job {}", jobId); - } EntityId entityId = jobId2EntityId.get(jobId); if (entityId != null) { + LOGGER.debug("notified of ingestion job finish {}", jobId); add(new ActiveEvent(jobId, Kind.JOB_FINISHED, entityId, Pair.of(jobStatus, exceptions))); - } else { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "No need to notify JOB_FINISHED for job {}", jobId); - } } + // else must be non-active job, e.g. a job for a query } // *** IActiveNotificationHandler @@ -169,13 +148,6 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active @Override public IActiveEntityEventsListener getListener(EntityId entityId) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "getActiveEntityListener was called with entity {}", entityId); - } - IActiveEntityEventsListener listener = entityEventListeners.get(entityId); - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Listener found: {}", listener); - } return entityEventListeners.get(entityId); } @@ -197,9 +169,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "registerListener was called for the entity {}", listener.getEntityId()); - } + LOGGER.debug("register listener for entity {}, state={}", listener.getEntityId(), listener.getState()); if (entityEventListeners.containsKey(listener.getEntityId())) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_IS_ALREADY_REGISTERED, listener.getEntityId()); } @@ -211,9 +181,7 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_NOTIFICATION_HANDLER_IS_SUSPENDED); } - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "unregisterListener was called for the entity {}", listener.getEntityId()); - } + LOGGER.debug("unregister listener for entity {}, state={}", listener.getEntityId(), listener.getState()); IActiveEntityEventsListener registeredListener = entityEventListeners.remove(listener.getEntityId()); if (registeredListener == null) { throw new RuntimeDataException(ErrorCode.ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED, listener.getEntityId()); @@ -229,20 +197,19 @@ public class ActiveNotificationHandler extends SingleThreadEventProcessor<Active LOGGER.info("Starting active recovery"); for (IActiveEntityEventsListener listener : getEventListeners()) { synchronized (listener) { - if (LOGGER.isEnabled(level)) { - LOGGER.log(level, "Entity {} is {}", listener.getEntityId(), listener.getState()); - } + LOGGER.debug("entity {} is {}, active={}, suspended={}", listener.getEntityId(), listener.getState(), + listener.isActive(), listener.isSuspended()); listener.notifyAll(); } } } - public void suspend(MetadataProvider mdProvider) throws HyracksDataException { + public void suspend(MetadataProvider mdProvider, String reason) throws HyracksDataException { synchronized (this) { if (suspended) { throw new RuntimeDataException(ErrorCode.ACTIVE_EVENT_HANDLER_ALREADY_SUSPENDED); } - LOGGER.log(level, "Suspending active events handler"); + LOGGER.debug("suspending active events handler. reason {}", reason); suspended = true; } Collection<IActiveEntityEventsListener> registeredListeners = entityEventListeners.values(); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java index 65d1039aff..65700413b1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryRequest.java @@ -69,10 +69,8 @@ public class CancelQueryRequest implements ICcAddressedMessage { } } } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Sending CancelQueryResponse to {}. requestId:{}, uuid:{}, contextId:{}, status:{}", nodeId, - requestId, uuid, contextId, status); - } + LOGGER.debug("sending CancelQueryResponse to {}. reqId:{}, uuid:{}, contextId:{}, status:{}", nodeId, requestId, + uuid, contextId, status); CancelQueryResponse response = new CancelQueryResponse(reqId, status); CCMessageBroker messageBroker = (CCMessageBroker) appCtx.getServiceContext().getMessageBroker(); try { @@ -82,4 +80,9 @@ public class CancelQueryRequest implements ICcAddressedMessage { } } + @Override + public String toString() { + return "CancelQueryRequest{from='" + nodeId + "', reqId=" + reqId + ", uuid='" + uuid + "', contextId='" + + contextId + "'}"; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java index d65ae316ee..68d34303eb 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/CancelQueryResponse.java @@ -49,4 +49,8 @@ public class CancelQueryResponse implements INcAddressedMessage { return status; } + @Override + public String toString() { + return "CancelQueryResponse{reqId=" + reqId + ", status=" + status + '}'; + } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java index e31417798b..03ba4a9191 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementRequestMessage.java @@ -150,7 +150,8 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { ILangCompilationProvider compilationProvider = ccExtMgr.getCompilationProvider(lang); IStorageComponentProvider storageComponentProvider = ccAppCtx.getStorageComponentProvider(); IStatementExecutorFactory statementExecutorFactory = ccApp.getStatementExecutorFactory(); - ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); + ExecuteStatementResponseMessage responseMsg = + new ExecuteStatementResponseMessage(requestMessageId, clientContextID, requestReference.getUuid()); final IStatementExecutor.StatementProperties statementProperties = new IStatementExecutor.StatementProperties(); responseMsg.setStatementProperties(statementProperties); try { @@ -230,9 +231,10 @@ public class ExecuteStatementRequestMessage implements ICcAddressedMessage { return null; } - protected static void sendRejection(Exception reason, CCMessageBroker messageBroker, long requestMessageId, + protected void sendRejection(Exception reason, CCMessageBroker messageBroker, long requestMessageId, String requestNodeId) { - ExecuteStatementResponseMessage responseMsg = new ExecuteStatementResponseMessage(requestMessageId); + ExecuteStatementResponseMessage responseMsg = + new ExecuteStatementResponseMessage(requestMessageId, clientContextID, requestReference.getUuid()); responseMsg.setError(reason); try { messageBroker.sendApplicationMessageToNC(responseMsg, requestNodeId); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java index 2cdede1252..eaadebe3b2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/message/ExecuteStatementResponseMessage.java @@ -31,9 +31,12 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.Warning; public final class ExecuteStatementResponseMessage implements INcAddressedMessage { - private static final long serialVersionUID = 1L; + + private static final long serialVersionUID = 2L; private final long requestMessageId; + private final String clientContextID; + private final String uuid; private String result; @@ -49,8 +52,10 @@ public final class ExecuteStatementResponseMessage implements INcAddressedMessag private Collection<Warning> warnings; - public ExecuteStatementResponseMessage(long requestMessageId) { + public ExecuteStatementResponseMessage(long requestMessageId, String clientContextID, String uuid) { this.requestMessageId = requestMessageId; + this.clientContextID = clientContextID; + this.uuid = uuid; } @Override @@ -120,7 +125,7 @@ public final class ExecuteStatementResponseMessage implements INcAddressedMessag @Override public String toString() { - return String.format("%s(id=%s): %d characters", getClass().getSimpleName(), requestMessageId, - result != null ? result.length() : 0); + return String.format("%s(id=%s, uuid=%s, clientContextId=%s): %d characters", getClass().getSimpleName(), + requestMessageId, uuid, clientContextID, result != null ? result.length() : 0); } } diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java index bee5ff9c26..9d378f5a41 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/message/RegistrationTasksResponseMessage.java @@ -100,4 +100,9 @@ public class RegistrationTasksResponseMessage extends CcIdentifiedMessage public MessageType getType() { return MessageType.REGISTRATION_TASKS_RESPONSE; } + + @Override + public String toString() { + return "RegistrationTasksResponseMessage{from='" + nodeId + '}'; + } } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java index 06380fe2b3..3449ee3e65 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/common/TestDataUtil.java @@ -162,7 +162,7 @@ public class TestDataUtil { try { ActiveNotificationHandler activeNotificationHandler = (ActiveNotificationHandler) ccAppCtx.getActiveNotificationHandler(); - activeNotificationHandler.suspend(metadataProvider); + activeNotificationHandler.suspend(metadataProvider, ""); try { IMetadataLockManager lockManager = ccAppCtx.getMetadataLockManager(); lockManager.acquireDatasetExclusiveModificationLock(metadataProvider.getLocks(), dataverseName, diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java index cb123bfb92..126edb157c 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/ActiveStatsTest.java @@ -146,7 +146,7 @@ public class ActiveStatsTest { Assert.assertTrue(requestedStats.contains("N/A")); // Fake partition message and notify eventListener ActivePartitionMessage partitionMessage = - new ActivePartitionMessage(activeRuntimeId, jobId, Event.RUNTIME_REGISTERED, null); + new ActivePartitionMessage(activeRuntimeId, jobId, Event.RUNTIME_REGISTERED, null, ""); partitionMessage.handle(appCtx); start.sync(); if (start.hasFailed()) { diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java index 0c4b806bee..0eba38158f 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/RuntimeRegistration.java @@ -49,7 +49,7 @@ public class RuntimeRegistration extends Action { subscriber.beforeExecute(); } ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage( - new ActiveRuntimeId(entityId, nc.getId(), partition), jobId, Event.RUNTIME_REGISTERED, null)); + new ActiveRuntimeId(entityId, nc.getId(), partition), jobId, Event.RUNTIME_REGISTERED, null, "")); nc.getClusterController().activeEvent(event); } diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java index f849f08386..fcd4d854f7 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestNodeControllerActor.java @@ -92,7 +92,7 @@ public class TestNodeControllerActor extends Actor { subscriber.beforeExecute(); } ActiveEvent event = new ActiveEvent(jobId, Kind.PARTITION_EVENT, entityId, new ActivePartitionMessage( - new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_DEREGISTERED, null)); + new ActiveRuntimeId(entityId, id, partition), jobId, Event.RUNTIME_DEREGISTERED, null, "")); clusterController.activeEvent(event); } }; diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java index 2d64f79a96..fa2c43ec53 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/TestUserActor.java @@ -263,7 +263,7 @@ public class TestUserActor extends Actor { Action action = new Action() { @Override protected void doExecute(MetadataProvider mdProvider) throws Exception { - handler.suspend(mdProvider); + handler.suspend(mdProvider, ""); } }; add(action); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 4fc9dd6728..507a2271e2 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -150,7 +150,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC IndexInfo iInfo = dsr == null ? null : dsr.getIndexInfo(resourceID); if (dsr == null || iInfo == null) { - throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST); + throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath); } PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(iInfo.getPartition()); diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java index f56e5c0a16..a098a29d9c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java @@ -56,7 +56,6 @@ import org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReferenc import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId; import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils; import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -175,11 +174,11 @@ public class LSMIOOperationCallback implements ILSMIOOperationCallback { synchronized (lsmIndex.getOperationTracker()) { List<ILSMDiskComponent> diskComponents = lsmIndex.getDiskComponents(); if (diskComponents.isEmpty()) { - LOGGER.log(Level.INFO, "There are no disk components"); + LOGGER.info("there are no disk components for {}", lsmIndex); return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID; } if (deletedComponents.contains(diskComponents.get(diskComponents.size() - 1))) { - LOGGER.log(Level.INFO, "All disk components have been deleted"); + LOGGER.info("all disk components have been deleted for {}", lsmIndex); return LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID; } int mostRecentComponentIndex = 0; diff --git a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties index 074245c95a..4835073eae 100644 --- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties +++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties @@ -282,7 +282,7 @@ 3006 = Illegal field %1$s in closed type %2$s 3007 = Twitter4J library not found! 3008 = Unable to ingest data -3009 = Exception in get record type %1$s for feed +3009 = Exception in get record type %1$s for ingestion 3010 = Does not support Hive data with list of non-primitive types 3011 = Cannot get hive type for field of type %1$s 3012 = Failed to get columns of record @@ -379,7 +379,7 @@ 3105 = %1$s is already registered 3106 = %1$s is not registered 3107 = Active Notification Handler is already suspended -3110 = Feed failed while reading a new record +3110 = Ingestion failed while reading a new record 3111 = Feed %1$s is not connected to any dataset 3112 = Array/Multiset item cannot be null 3113 = Failed to parse record diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java index 4279ebd712..efa93ab4c0 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/dataflow/FeedRecordDataFlowController.java @@ -98,7 +98,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } } } catch (HyracksDataException e) { - LOGGER.log(Level.WARN, "Exception during ingestion", e); + logFailure(e); if (e.matches(ErrorCode.FEED_FAILED_WHILE_GETTING_A_NEW_RECORD)) { // Failure but we know we can for sure push the previously parsed records safely failure = e; @@ -113,7 +113,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } } catch (Throwable e) { failure = e; - LOGGER.log(Level.WARN, "Failure while operating a feed source", e); + logFailure(e); } finally { failure = finish(failure); } @@ -126,7 +126,7 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl } private synchronized void setState(State newState) { - LOGGER.log(Level.INFO, "State is being set from " + state + " to " + newState); + LOGGER.info("controller is being set from {} to {} ", state, newState); state = newState; } @@ -289,4 +289,12 @@ public class FeedRecordDataFlowController<T> extends AbstractFeedDataFlowControl public void handleGenericEvent(ActiveManagerMessage event) { recordReader.handleGenericEvent(event); } + + private void logFailure(Throwable th) { + if (th instanceof InterruptedException || th.getCause() instanceof InterruptedException) { + LOGGER.warn("data flow controller interrupted", th); + } else { + LOGGER.warn("data flow controller failed", th); + } + } } diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 7db5d493a1..c46558a8c4 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -121,7 +121,7 @@ 101 = Page %1$s does not exist in file %2$s 102 = Failed to open virtual buffer cache since it is already open 103 = Failed to close virtual buffer cache since it is already closed -104 = Index does not exist +104 = Index does not exist (%1$s) 105 = Cannot drop in-use index (%1$s) 106 = Failed to deactivate the bloom filter since it is pinned by other users 107 = The given search predicate can't be null. diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java index 771832eeea..6630ba72b4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ApplicationMessageWork.java @@ -25,6 +25,7 @@ import org.apache.hyracks.api.deployment.DeploymentId; import org.apache.hyracks.api.messages.IMessage; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.deployment.DeploymentUtils; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,8 +35,8 @@ import org.apache.logging.log4j.Logger; public class ApplicationMessageWork extends AbstractHeartbeatWork { private static final Logger LOGGER = LogManager.getLogger(); - private byte[] message; - private DeploymentId deploymentId; + private final byte[] message; + private final DeploymentId deploymentId; public ApplicationMessageWork(ClusterControllerService ccs, byte[] message, DeploymentId deploymentId, String nodeId) { @@ -56,6 +57,11 @@ public class ApplicationMessageWork extends AbstractHeartbeatWork { } } + @Override + public Level logLevel() { + return Level.TRACE; + } + @Override public String toString() { return getName() + ": nodeID: " + nodeId; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java index c36b887c28..f08e209444 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetNodeControllersInfoWork.java @@ -24,10 +24,12 @@ import org.apache.hyracks.api.client.NodeControllerInfo; import org.apache.hyracks.control.cc.cluster.INodeManager; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.common.work.IResultCallback; +import org.apache.logging.log4j.Level; public class GetNodeControllersInfoWork extends AbstractWork { + private final INodeManager nodeManager; - private IResultCallback<Map<String, NodeControllerInfo>> callback; + private final IResultCallback<Map<String, NodeControllerInfo>> callback; public GetNodeControllersInfoWork(INodeManager nodeManager, IResultCallback<Map<String, NodeControllerInfo>> callback) { @@ -39,4 +41,9 @@ public class GetNodeControllersInfoWork extends AbstractWork { public void run() { callback.setValue(nodeManager.getNodeControllerInfoMap()); } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java index bf95ff236b..b7dbd7546e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/GetResultDirectoryAddressWork.java @@ -22,8 +22,10 @@ import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.logging.log4j.Level; public class GetResultDirectoryAddressWork extends SynchronizableWork { + private final ClusterControllerService ccs; private final IResultCallback<NetworkAddress> callback; @@ -42,4 +44,9 @@ public class GetResultDirectoryAddressWork extends SynchronizableWork { callback.setException(e); } } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java index 77d2f82f9c..6262c475e5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java @@ -32,13 +32,14 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class JobCleanupWork extends AbstractWork { + private static final Logger LOGGER = LogManager.getLogger(); - private IJobManager jobManager; - private JobId jobId; - private JobStatus status; - private List<Exception> exceptions; - private IResultCallback<Void> callback; + private final IJobManager jobManager; + private final JobId jobId; + private final JobStatus status; + private final List<Exception> exceptions; + private final IResultCallback<Void> callback; public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus status, List<Exception> exceptions, IResultCallback<Void> callback) { @@ -51,12 +52,10 @@ public class JobCleanupWork extends AbstractWork { @Override public void run() { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Cleanup for job: {}", jobId); - } + LOGGER.info("cleaning up {} on NCs, status={}", jobId, status); final JobRun jobRun = jobManager.get(jobId); if (jobRun == null) { - LOGGER.debug("Ignoring cleanup for unknown job: {}", jobId); + LOGGER.debug("ignoring cleanup for unknown {}", jobId); return; } try { diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java index cfedfc9d02..86e36d6f6e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobStartWork.java @@ -35,8 +35,13 @@ import org.apache.hyracks.control.cc.job.JobRun; import org.apache.hyracks.control.common.deployment.DeploymentUtils; import org.apache.hyracks.control.common.work.IResultCallback; import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class JobStartWork extends SynchronizableWork { + + private static final Logger LOGGER = LogManager.getLogger(); private final ClusterControllerService ccs; private final byte[] acggfBytes; private final Set<JobFlag> jobFlags; @@ -67,6 +72,7 @@ public class JobStartWork extends SynchronizableWork { JobId jobId; JobRun run; jobId = jobIdFactory.create(); + LOGGER.debug("created {}", jobId); if (deployedJobSpecId == null) { //Need to create the ActivityClusterGraph IActivityClusterGraphGeneratorFactory acggf = (IActivityClusterGraphGeneratorFactory) DeploymentUtils @@ -85,4 +91,9 @@ public class JobStartWork extends SynchronizableWork { callback.setException(e); } } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java index c3a09f9da3..76a72c0da3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobletCleanupNotificationWork.java @@ -35,6 +35,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class JobletCleanupNotificationWork extends AbstractHeartbeatWork { + private static final Logger LOGGER = LogManager.getLogger(); private final JobId jobId; @@ -46,6 +47,7 @@ public class JobletCleanupNotificationWork extends AbstractHeartbeatWork { @Override public void runWork() { + LOGGER.debug("node {} finished job clean-up {}", nodeId, jobId); IJobManager jobManager = ccs.getJobManager(); final JobRun run = jobManager.get(jobId); if (run == null) { @@ -82,4 +84,9 @@ public class JobletCleanupNotificationWork extends AbstractHeartbeatWork { public String toString() { return getName() + " jobId:" + jobId + ", nodeId:" + nodeId; } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java index ec21785baa..810fda24e4 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterNodeWork.java @@ -31,6 +31,7 @@ import org.apache.hyracks.control.common.controllers.NodeParameters; import org.apache.hyracks.control.common.controllers.NodeRegistration; import org.apache.hyracks.control.common.ipc.NodeControllerRemoteProxy; import org.apache.hyracks.control.common.work.SynchronizableWork; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -83,4 +84,9 @@ public class RegisterNodeWork extends SynchronizableWork { nc.sendRegistrationResult(params, e); } } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java index ee10669952..cd9b6d06b1 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RemoveDeadNodesWork.java @@ -50,9 +50,7 @@ public class RemoveDeadNodesWork extends AbstractWork { Collection<JobId> affectedJobIds = result.getRight(); int size = affectedJobIds.size(); if (size > 0) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Number of affected jobs: " + size); - } + LOGGER.info("number of affected jobs due to dead nodes removal {}", size); IJobManager jobManager = ccs.getJobManager(); for (JobId jobId : affectedJobIds) { JobRun run = jobManager.get(jobId); diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java index 0c531422ff..869caa5506 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskCompleteWork.java @@ -29,8 +29,13 @@ import org.apache.hyracks.control.cc.job.TaskAttempt; import org.apache.hyracks.control.common.job.profiling.om.JobProfile; import org.apache.hyracks.control.common.job.profiling.om.JobletProfile; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class TaskCompleteWork extends AbstractTaskLifecycleWork { + + private static final Logger LOGGER = LogManager.getLogger(); private final TaskProfile statistics; public TaskCompleteWork(ClusterControllerService ccs, JobId jobId, TaskAttemptId taId, String nodeId, @@ -44,8 +49,10 @@ public class TaskCompleteWork extends AbstractTaskLifecycleWork { IJobManager jobManager = ccs.getJobManager(); JobRun run = jobManager.get(jobId); if (run == null) { + LOGGER.debug("node completed task for unknown job {}:{}:{}", nodeId, jobId, taId); return; } + LOGGER.debug("node completed task {}:{}:{}", nodeId, jobId, taId); if (statistics != null) { JobProfile jobProfile = run.getJobProfile(); Map<String, JobletProfile> jobletProfiles = jobProfile.getJobletProfiles(); @@ -63,4 +70,9 @@ public class TaskCompleteWork extends AbstractTaskLifecycleWork { public String toString() { return getName() + ": [" + nodeId + "[" + jobId + ":" + taId + "]"; } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java index 6d4f1730b5..f386a89579 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/ApplicationMessageWork.java @@ -29,11 +29,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class ApplicationMessageWork extends AbstractWork { + private static final Logger LOGGER = LogManager.getLogger(); - private byte[] message; - private DeploymentId deploymentId; - private String nodeId; - private NodeControllerService ncs; + private final byte[] message; + private final DeploymentId deploymentId; + private final String nodeId; + private final NodeControllerService ncs; public ApplicationMessageWork(NodeControllerService ncs, byte[] message, DeploymentId deploymentId, String nodeId) { this.ncs = ncs; @@ -58,6 +59,11 @@ public class ApplicationMessageWork extends AbstractWork { } } + @Override + public Level logLevel() { + return Level.TRACE; + } + @Override public String toString() { return getName() + ": nodeId: " + nodeId; diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java index 75edd38b00..efc8467ca9 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/CleanupJobletWork.java @@ -25,10 +25,12 @@ import org.apache.hyracks.api.job.JobStatus; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.Joblet; import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class CleanupJobletWork extends AbstractWork { + private static final Logger LOGGER = LogManager.getLogger(); private final NodeControllerService ncs; @@ -45,7 +47,7 @@ public class CleanupJobletWork extends AbstractWork { @Override public void run() { - LOGGER.debug("cleaning up after job: {}", jobId); + LOGGER.debug("cleaning up {}", jobId); ncs.removeJobParameterByteStore(jobId); ncs.getPartitionManager().jobCompleted(jobId, status); Map<JobId, Joblet> jobletMap = ncs.getJobletMap(); @@ -59,4 +61,9 @@ public class CleanupJobletWork extends AbstractWork { public String toString() { return getName() + " jobId:" + jobId + ", status:" + status; } + + @Override + public Level logLevel() { + return Level.TRACE; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java index 60860c5afe..7e1b6f5d5f 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskCompleteWork.java @@ -18,6 +18,8 @@ */ package org.apache.hyracks.control.nc.work; +import org.apache.hyracks.api.dataflow.TaskAttemptId; +import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.common.job.profiling.om.TaskProfile; import org.apache.hyracks.control.common.work.AbstractWork; import org.apache.hyracks.control.nc.NodeControllerService; @@ -38,20 +40,27 @@ public class NotifyTaskCompleteWork extends AbstractWork { @Override public void run() { - TaskProfile taskProfile = new TaskProfile(task.getTaskAttemptId(), task.getPartitionSendProfile(), + JobId jobId = task.getJoblet().getJobId(); + TaskAttemptId taskAttemptId = task.getTaskAttemptId(); + LOGGER.debug("notifying CC of task complete {}:{}", jobId, taskAttemptId); + TaskProfile taskProfile = new TaskProfile(taskAttemptId, task.getPartitionSendProfile(), task.getStatsCollector(), task.getWarnings(), task.getWarningCollector().getTotalWarningsCount()); try { - ncs.getClusterController(task.getJobletContext().getJobId().getCcId()).notifyTaskComplete( - task.getJobletContext().getJobId(), task.getTaskAttemptId(), ncs.getId(), taskProfile); + ncs.getClusterController(task.getJobletContext().getJobId().getCcId()) + .notifyTaskComplete(task.getJobletContext().getJobId(), taskAttemptId, ncs.getId(), taskProfile); } catch (Exception e) { - LOGGER.log(Level.ERROR, "Failed notifying task complete for " + task.getTaskAttemptId(), e); + LOGGER.log(Level.ERROR, "Failed notifying task complete for {}", taskAttemptId, e); } task.getJoblet().removeTask(task); } @Override public String toString() { - return getName() + ": [" + ncs.getId() + "[" + task.getJoblet().getJobId() + ":" + task.getTaskAttemptId() - + "]"; + return getName() + ": [" + task.getJoblet().getJobId() + ":" + task.getTaskAttemptId() + "]"; + } + + @Override + public Level logLevel() { + return Level.TRACE; } } diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java index f277046496..dd4a956bd3 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/StartTasksWork.java @@ -318,6 +318,6 @@ public class StartTasksWork extends AbstractWork { @Override public String toString() { - return getName() + " jobId:" + jobId; + return getName() + " jobId:" + jobId + " tasks:" + taskDescriptors.size(); } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java index 16461de93e..b79c3b1ddf 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexDataflowHelper.java @@ -73,8 +73,7 @@ public class IndexDataflowHelper implements IIndexDataflowHelper { // Get local resource LocalResource lr = getResource(); if (lr == null) { - LOGGER.error("index {} does not exist", resourceRef.getRelativePath()); - throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST); + throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourceRef.getRelativePath()); } IResource resource = lr.getResource(); index = resource.createInstance(ctx); diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java index c9505a68af..ab301a9f79 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexLifecycleManager.java @@ -196,7 +196,7 @@ public class IndexLifecycleManager implements IResourceLifecycleManager<IIndex>, public void unregister(String resourcePath) throws HyracksDataException { IndexInfo info = indexInfos.get(resourcePath); if (info == null) { - throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST); + throw HyracksDataException.create(ErrorCode.INDEX_DOES_NOT_EXIST, resourcePath); } if (info.referenceCount != 0) { diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java index 4ee12454cd..966d6d15cc 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java @@ -145,7 +145,7 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl // However, we cannot throw an exception here to be compatible with legacy datasets. // In this case, the disk component would always get a garbage Id [-1, -1], which makes the // component Id-based optimization useless but still correct. - LOGGER.warn("Component Id not found from disk component metadata"); + LOGGER.warn("component id {} not found from disk component metadata {}", componentId, getIndex()); } return componentId; } @@ -160,9 +160,7 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl @Override public void markAsValid(boolean persist, IPageWriteFailureCallback callback) throws HyracksDataException { ComponentUtils.markAsValid(getMetadataHolder(), persist, callback); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Marked as valid component with id: " + getId()); - } + LOGGER.debug("marked {} as valid component with id {}", getIndex(), getId()); } @Override @@ -247,6 +245,7 @@ public abstract class AbstractLSMDiskComponent extends AbstractLSMComponent impl @Override public String toString() { - return "{\"class\":" + getClass().getSimpleName() + "\", \"index\":" + getIndex().toString() + "}"; + return "{\"class\":" + getClass().getSimpleName() + "\", \"id\":" + componentId + ", \"index\":" + getIndex() + + "}"; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java index bf93dc0551..77e5c6e524 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java @@ -676,7 +676,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex { return "{\"class\" : \"" + getClass().getSimpleName() + "\", \"dir\" : \"" + fileManager.getBaseDir() + "\", \"memory\" : " + (memoryComponents == null ? 0 : memoryComponents) + ", \"disk\" : " + diskComponents.size() + ", \"num-scheduled-flushes\":" + numScheduledFlushes - + ", \"current-memory-component\":" + currentMutableComponentId.get() + "}"; + + ", \"current-memory-component\":" + + (currentMutableComponentId == null ? "" : currentMutableComponentId.get()) + "}"; } @Override @@ -858,11 +859,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex { if (!memoryComponent.isModified() || opCtx.getOperation() == IndexOperation.DELETE_COMPONENTS) { return EmptyComponent.INSTANCE; } - if (LOGGER.isInfoEnabled()) { - FlushOperation flushOp = (FlushOperation) operation; - LOGGER.log(Level.INFO, - "Flushing component with id: " + flushOp.getFlushingComponent().getId() + " in the index " + this); - } + LOGGER.debug("flushing component with id {} in the index {}", + ((FlushOperation) operation).getFlushingComponent().getId(), this); return doFlush(operation); } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java index 8d37d97eed..7088791aa7 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java @@ -337,9 +337,7 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im throw new IllegalStateException( this + " receives illegal id. Old id " + this.componentId + ", new id " + componentId); } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Component Id was reset from " + this.componentId + " to " + componentId); - } + LOGGER.debug("component id of {} was reset from {} to {}", getIndex(), this.componentId, componentId); this.componentId = componentId; if (componentId != null) { LSMComponentIdUtils.persist(this.componentId, metadata); @@ -355,6 +353,6 @@ public abstract class AbstractLSMMemoryComponent extends AbstractLSMComponent im public String toString() { return "{\"class\":\"" + getClass().getSimpleName() + "\", \"state\":\"" + state + "\", \"writers\":" + writerCount + ", \"readers\":" + readerCount + ", \"pendingFlushes\":" + pendingFlushes - + ", \"id\":\"" + componentId + "\"}"; + + ", \"id\":\"" + componentId + "\", \"index\":" + getIndex() + "}"; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java index 3ea0f49084..9a112ee4d5 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/EmptyComponent.java @@ -152,4 +152,9 @@ public class EmptyComponent implements ILSMDiskComponent { public int getReaderCount() { return 0; } + + @Override + public String toString() { + return "EmptyComponent"; + } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java index 950a8e5b23..717bcce817 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java @@ -527,18 +527,19 @@ public class LSMHarness implements ILSMHarness { } } } + ILSMDiskComponent newComponent; try { doIo(operation); } finally { - exitComponents(operation.getAccessor().getOpContext(), LSMOperationType.FLUSH, operation.getNewComponent(), + newComponent = operation.getNewComponent(); + exitComponents(operation.getAccessor().getOpContext(), LSMOperationType.FLUSH, newComponent, operation.getStatus() == LSMIOOperationStatus.FAILURE); opTracker.completeOperation(lsmIndex, LSMOperationType.FLUSH, operation.getAccessor().getOpContext().getSearchOperationCallback(), operation.getAccessor().getOpContext().getModificationCallback()); } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Finished the flush operation for index: {}. Result: {}", lsmIndex, operation.getStatus()); - } + LOGGER.debug("Finished the flush operation for {}. Result: {}", + (newComponent == null ? lsmIndex : newComponent), operation.getStatus()); } public void doIo(ILSMIOOperation operation) { @@ -577,24 +578,23 @@ public class LSMHarness implements ILSMHarness { @Override public void merge(ILSMIOOperation operation) throws HyracksDataException { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Started a merge operation for index: {}", lsmIndex); - } + LOGGER.debug("Started a merge operation for index {}", lsmIndex); synchronized (opTracker) { enterComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE); } + ILSMDiskComponent newComponent; try { doIo(operation); } finally { - exitComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE, operation.getNewComponent(), + newComponent = operation.getNewComponent(); + exitComponents(operation.getAccessor().getOpContext(), LSMOperationType.MERGE, newComponent, operation.getStatus() == LSMIOOperationStatus.FAILURE); opTracker.completeOperation(lsmIndex, LSMOperationType.MERGE, operation.getAccessor().getOpContext().getSearchOperationCallback(), operation.getAccessor().getOpContext().getModificationCallback()); } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Finished the merge operation for index: {}. Result: {}", lsmIndex, operation.getStatus()); - } + LOGGER.debug("Finished the merge operation for {}. Result: {}", + (newComponent == null ? lsmIndex : newComponent), operation.getStatus()); } @Override @@ -777,6 +777,8 @@ public class LSMHarness implements ILSMHarness { ioOperation = scheduleFlush(ctx); } else { // since we're not deleting the memory component, we can't delete any previous component + LOGGER.debug("not deleting any components of {} since memory component {} won't be deleted", lsmIndex, + memComponent); return; } } diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java index 1ff9fa8b25..842ec61527 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/util/ComponentUtils.java @@ -34,7 +34,6 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.common.buffercache.IBufferCache; import org.apache.hyracks.storage.common.buffercache.IPageWriteFailureCallback; -import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -80,11 +79,7 @@ public class ComponentUtils { */ public static void get(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { - boolean loggable = LOGGER.isDebugEnabled(); value.reset(); - if (loggable) { - LOGGER.log(Level.DEBUG, "Getting " + key + " from index " + index); - } // Lock the opTracker to ensure index components don't change synchronized (index.getOperationTracker()) { ILSMMemoryComponent cmc = index.getCurrentMemoryComponent(); @@ -92,33 +87,17 @@ public class ComponentUtils { index.getCurrentMemoryComponent().getMetadata().get(key, value); } if (value.getLength() == 0) { - if (loggable) { - LOGGER.log(Level.DEBUG, key + " was not found in mutable memory component of " + index); - } - // was not found in the in current mutable component, search in the other in memory components + // was not found in the in current mutable component, search in the other in-memory components fromImmutableMemoryComponents(index, key, value); if (value.getLength() == 0) { - if (loggable) { - LOGGER.log(Level.DEBUG, key + " was not found in all immmutable memory components of " + index); - } - // was not found in the in all in memory components, search in the disk components + // was not found in all in-memory components, search in the disk components fromDiskComponents(index, key, value); - if (loggable) { - if (value.getLength() == 0) { - LOGGER.log(Level.DEBUG, key + " was not found in all disk components of " + index); - } else { - LOGGER.log(Level.DEBUG, key + " was found in disk components of " + index); - } - } - } else { - if (loggable) { - LOGGER.log(Level.DEBUG, key + " was found in the immutable memory components of " + index); + if (value.getLength() == 0) { + LOGGER.debug("{} was NOT found", key); } } } else { - if (loggable) { - LOGGER.log(Level.DEBUG, key + " was found in mutable memory component of " + index); - } + LOGGER.debug("{} was found in mutable memory component {}", key, cmc); } } } @@ -143,17 +122,11 @@ public class ComponentUtils { private static void fromDiskComponents(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { - boolean loggable = LOGGER.isDebugEnabled(); - if (loggable) { - LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components of " + index); - } for (ILSMDiskComponent c : index.getDiskComponents()) { - if (loggable) { - LOGGER.log(Level.DEBUG, "Getting " + key + " from disk components " + c); - } c.getMetadata().get(key, value); if (value.getLength() != 0) { // Found + LOGGER.debug("{} was found in disk component {}", key, c); return; } } @@ -161,21 +134,10 @@ public class ComponentUtils { private static void fromImmutableMemoryComponents(ILSMIndex index, IValueReference key, ArrayBackedValueStorage value) throws HyracksDataException { - boolean loggable = LOGGER.isDebugEnabled(); - if (loggable) { - LOGGER.log(Level.DEBUG, "Getting " + key + " from immutable memory components of " + index); - } List<ILSMMemoryComponent> memComponents = index.getMemoryComponents(); int numOtherMemComponents = memComponents.size() - 1; int next = index.getCurrentMemoryComponentIndex(); - if (loggable) { - LOGGER.log(Level.DEBUG, index + " has " + numOtherMemComponents + " immutable memory components"); - } for (int i = 0; i < numOtherMemComponents; i++) { - if (loggable) { - LOGGER.log(Level.DEBUG, - "trying to get " + key + " from immutable memory components number: " + (i + 1)); - } next = next - 1; if (next < 0) { next = memComponents.size() - 1; @@ -185,6 +147,7 @@ public class ComponentUtils { c.getMetadata().get(key, value); if (value.getLength() != 0) { // Found + LOGGER.debug("{} was found in immutable memory component {}", key, c); return; } }
