This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit a4307a4de144c790cf8371341e3baefec88ed1b6
Author: Ali Alsuliman <[email protected]>
AuthorDate: Mon Apr 22 22:07:31 2024 +0300

    [NO ISSUE][OTH] Logging improvements for job failure events
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    Change-Id: Ib1c99f00cde31224b0bcb86357c64d9c5404d2e7
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18237
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Ali Alsuliman <[email protected]>
    Reviewed-by: Murtadha Hubail <[email protected]>
---
 .../hyracks/control/cc/executor/JobExecutor.java   | 15 ++---
 .../apache/hyracks/control/cc/job/JobManager.java  |  5 +-
 .../control/cc/partitions/PartitionMatchMaker.java | 71 ++++++----------------
 .../control/cc/result/ResultDirectoryService.java  | 25 +++++---
 .../hyracks/control/cc/work/JobCleanupWork.java    | 13 ++--
 .../hyracks/control/cc/work/TaskFailureWork.java   |  9 +--
 .../control/nc/work/NotifyTaskFailureWork.java     |  4 --
 7 files changed, 48 insertions(+), 94 deletions(-)

diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
index 3574acda5f..631f226820 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/executor/JobExecutor.java
@@ -539,7 +539,7 @@ public class JobExecutor {
 
     private void abortTaskCluster(TaskClusterAttempt tcAttempt,
             TaskClusterAttempt.TaskClusterStatus failedOrAbortedStatus) {
-        LOGGER.trace(() -> "Aborting task cluster: " + tcAttempt.getAttempt());
+        LOGGER.trace("Aborting task cluster: {}", tcAttempt.getAttempt());
         Set<TaskAttemptId> abortTaskIds = new HashSet<>();
         Map<String, List<TaskAttemptId>> abortTaskAttemptMap = new HashMap<>();
         for (TaskAttempt ta : tcAttempt.getTaskAttempts().values()) {
@@ -561,14 +561,12 @@ public class JobExecutor {
             }
         }
         final JobId jobId = jobRun.getJobId();
-        LOGGER.trace(() -> "Abort map for job: " + jobId + ": " + 
abortTaskAttemptMap);
+        LOGGER.trace("Abort map for job: {}: {}", jobId, abortTaskAttemptMap);
         INodeManager nodeManager = ccs.getNodeManager();
         abortTaskAttemptMap.forEach((key, abortTaskAttempts) -> {
             final NodeControllerState node = 
nodeManager.getNodeControllerState(key);
             if (node != null) {
-                if (LOGGER.isTraceEnabled()) {
-                    LOGGER.trace("Aborting: " + abortTaskAttempts + " at " + 
key);
-                }
+                LOGGER.trace("Aborting: {} at {}", abortTaskAttempts, key);
                 try {
                     node.getNodeController().abortTasks(jobId, 
abortTaskAttempts);
                 } catch (Exception e) {
@@ -579,8 +577,8 @@ public class JobExecutor {
         inProgressTaskClusters.remove(tcAttempt.getTaskCluster());
         TaskCluster tc = tcAttempt.getTaskCluster();
         PartitionMatchMaker pmm = jobRun.getPartitionMatchMaker();
-        pmm.removeUncommittedPartitions(tc.getProducedPartitions(), 
abortTaskIds);
-        pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds);
+        pmm.removeUncommittedPartitions(tc.getProducedPartitions(), 
abortTaskIds, jobId);
+        pmm.removePartitionRequests(tc.getRequiredPartitions(), abortTaskIds, 
jobId);
 
         tcAttempt.setStatus(failedOrAbortedStatus);
         tcAttempt.setEndTime(System.currentTimeMillis());
@@ -683,7 +681,6 @@ public class JobExecutor {
      */
     public void notifyTaskFailure(TaskAttempt ta, List<Exception> exceptions) {
         try {
-            LOGGER.debug("Received failure notification for TaskAttempt " + 
ta.getTaskAttemptId());
             TaskAttemptId taId = ta.getTaskAttemptId();
             TaskCluster tc = ta.getTask().getTaskCluster();
             TaskClusterAttempt lastAttempt = findLastTaskClusterAttempt(tc);
@@ -696,7 +693,7 @@ public class JobExecutor {
                 LOGGER.trace(() -> "Marking TaskAttempt " + 
ta.getTaskAttemptId()
                         + " as failed and the number of max re-attempts = " + 
maxReattempts);
                 if (lastAttempt.getAttempt() >= maxReattempts || 
isCancelled()) {
-                    LOGGER.debug(() -> "Aborting the job of " + 
ta.getTaskAttemptId());
+                    LOGGER.debug("Aborting the job:{} of {}", 
jobRun.getJobId(), ta.getTaskAttemptId());
                     abortJob(exceptions, NoOpCallback.INSTANCE);
                     return;
                 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
index 4882f4ac49..33bdd0558a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/job/JobManager.java
@@ -129,7 +129,7 @@ public class JobManager implements IJobManager {
         if (activeRunMap.containsKey(jobId)) {
             JobRun jobRun = activeRunMap.get(jobId);
             // The following call will abort all ongoing tasks and then 
consequently
-            // trigger JobCleanupWork and JobCleanupNotificationWork which 
will update the lifecyle of the job.
+            // trigger JobCleanupWork and JobCleanupNotificationWork which 
will update the lifecycle of the job.
             // Therefore, we do not remove the job out of activeRunMap here.
             jobRun.getExecutor().cancelJob(callback);
             return;
@@ -139,7 +139,7 @@ public class JobManager implements IJobManager {
         if (jobRun != null) {
             List<Exception> exceptions =
                     
Collections.singletonList(HyracksException.create(ErrorCode.JOB_CANCELED, 
jobId));
-            // Since the job has not been executed, we only need to update its 
status and lifecyle here.
+            // Since the job has not been executed, we only need to update its 
status and lifecycle here.
             jobRun.setStatus(JobStatus.FAILURE_BEFORE_EXECUTION, exceptions);
             runMapArchive.put(jobId, jobRun);
             runMapHistory.put(jobId, exceptions);
@@ -170,7 +170,6 @@ public class JobManager implements IJobManager {
             return;
         }
         if (run.getPendingStatus() != null) {
-            LOGGER.warn("Ignoring duplicate cleanup for JobRun with id: {}", 
run::getJobId);
             return;
         }
         Set<String> targetNodes = run.getParticipatingNodeIds();
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index ac29b530d4..8f9194453a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@ -28,6 +28,7 @@ import java.util.Set;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
+import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.partitions.PartitionId;
 import org.apache.hyracks.control.common.job.PartitionDescriptor;
 import org.apache.hyracks.control.common.job.PartitionRequest;
@@ -43,14 +44,13 @@ public class PartitionMatchMaker {
     private final Map<PartitionId, List<PartitionRequest>> partitionRequests;
 
     public PartitionMatchMaker() {
-        partitionDescriptors = new HashMap<PartitionId, 
List<PartitionDescriptor>>();
-        partitionRequests = new HashMap<PartitionId, List<PartitionRequest>>();
+        partitionDescriptors = new HashMap<>();
+        partitionRequests = new HashMap<>();
     }
 
     public List<Pair<PartitionDescriptor, PartitionRequest>> 
registerPartitionDescriptor(
             PartitionDescriptor partitionDescriptor) {
-        List<Pair<PartitionDescriptor, PartitionRequest>> matches =
-                new ArrayList<Pair<PartitionDescriptor, PartitionRequest>>();
+        List<Pair<PartitionDescriptor, PartitionRequest>> matches = new 
ArrayList<>();
         PartitionId pid = partitionDescriptor.getPartitionId();
         boolean matched = false;
         List<PartitionRequest> requests = partitionRequests.get(pid);
@@ -73,11 +73,7 @@ public class PartitionMatchMaker {
         }
 
         if (!matched) {
-            List<PartitionDescriptor> descriptors = 
partitionDescriptors.get(pid);
-            if (descriptors == null) {
-                descriptors = new ArrayList<PartitionDescriptor>();
-                partitionDescriptors.put(pid, descriptors);
-            }
+            List<PartitionDescriptor> descriptors = 
partitionDescriptors.computeIfAbsent(pid, k -> new ArrayList<>());
             descriptors.add(partitionDescriptor);
         }
 
@@ -108,11 +104,7 @@ public class PartitionMatchMaker {
         }
 
         if (match == null) {
-            List<PartitionRequest> requests = partitionRequests.get(pid);
-            if (requests == null) {
-                requests = new ArrayList<PartitionRequest>();
-                partitionRequests.put(pid, requests);
-            }
+            List<PartitionRequest> requests = 
partitionRequests.computeIfAbsent(pid, k -> new ArrayList<>());
             requests.add(partitionRequest);
         }
 
@@ -133,17 +125,11 @@ public class PartitionMatchMaker {
     }
 
     private interface IEntryFilter<T> {
-        public boolean matches(T o);
+        boolean matches(T o);
     }
 
     private static <T> void removeEntries(List<T> list, IEntryFilter<T> 
filter) {
-        Iterator<T> j = list.iterator();
-        while (j.hasNext()) {
-            T o = j.next();
-            if (filter.matches(o)) {
-                j.remove();
-            }
-        }
+        list.removeIf(filter::matches);
     }
 
     private static <T> void removeEntries(Map<PartitionId, List<T>> map, 
IEntryFilter<T> filter) {
@@ -159,30 +145,16 @@ public class PartitionMatchMaker {
     }
 
     public void notifyNodeFailures(final Collection<String> deadNodes) {
-        removeEntries(partitionDescriptors, new 
IEntryFilter<PartitionDescriptor>() {
-            @Override
-            public boolean matches(PartitionDescriptor o) {
-                return deadNodes.contains(o.getNodeId());
-            }
-        });
-        removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() {
-            @Override
-            public boolean matches(PartitionRequest o) {
-                return deadNodes.contains(o.getNodeId());
-            }
-        });
+        removeEntries(partitionDescriptors, o -> 
deadNodes.contains(o.getNodeId()));
+        removeEntries(partitionRequests, o -> 
deadNodes.contains(o.getNodeId()));
     }
 
-    public void removeUncommittedPartitions(Set<PartitionId> partitionIds, 
final Set<TaskAttemptId> taIds) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Removing uncommitted partitions: " + partitionIds);
+    public void removeUncommittedPartitions(Set<PartitionId> partitionIds, 
Set<TaskAttemptId> taIds, JobId jobId) {
+        if (!partitionIds.isEmpty()) {
+            LOGGER.debug("Removing uncommitted partitions {}: {}", jobId, 
partitionIds);
         }
-        IEntryFilter<PartitionDescriptor> filter = new 
IEntryFilter<PartitionDescriptor>() {
-            @Override
-            public boolean matches(PartitionDescriptor o) {
-                return o.getState() != PartitionState.COMMITTED && 
taIds.contains(o.getProducingTaskAttemptId());
-            }
-        };
+        IEntryFilter<PartitionDescriptor> filter =
+                o -> o.getState() != PartitionState.COMMITTED && 
taIds.contains(o.getProducingTaskAttemptId());
         for (PartitionId pid : partitionIds) {
             List<PartitionDescriptor> descriptors = 
partitionDescriptors.get(pid);
             if (descriptors != null) {
@@ -194,16 +166,11 @@ public class PartitionMatchMaker {
         }
     }
 
-    public void removePartitionRequests(Set<PartitionId> partitionIds, final 
Set<TaskAttemptId> taIds) {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("Removing partition requests: " + partitionIds);
+    public void removePartitionRequests(Set<PartitionId> partitionIds, 
Set<TaskAttemptId> taIds, JobId jobId) {
+        if (!partitionIds.isEmpty()) {
+            LOGGER.debug("Removing partition requests {}: {}", jobId, 
partitionIds);
         }
-        IEntryFilter<PartitionRequest> filter = new 
IEntryFilter<PartitionRequest>() {
-            @Override
-            public boolean matches(PartitionRequest o) {
-                return taIds.contains(o.getRequestingTaskAttemptId());
-            }
-        };
+        IEntryFilter<PartitionRequest> filter = o -> 
taIds.contains(o.getRequestingTaskAttemptId());
         for (PartitionId pid : partitionIds) {
             List<PartitionRequest> requests = partitionRequests.get(pid);
             if (requests != null) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
index 9f8a7e24ea..46dd3512af 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/result/ResultDirectoryService.java
@@ -45,7 +45,6 @@ import org.apache.hyracks.api.result.ResultSetMetaData;
 import org.apache.hyracks.control.common.result.AbstractResultManager;
 import org.apache.hyracks.control.common.result.ResultStateSweeper;
 import org.apache.hyracks.control.common.work.IResultCallback;
-import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -79,9 +78,7 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
 
     @Override
     public synchronized void notifyJobCreation(JobId jobId, JobSpecification 
spec) throws HyracksException {
-        if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug(getClass().getSimpleName() + " notified of new job " 
+ jobId);
-        }
+        LOGGER.debug("{} notified of new job {}", getClass().getSimpleName(), 
jobId);
         if (jobResultLocations.get(jobId) != null) {
             throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, 
jobId);
         }
@@ -157,15 +154,14 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
 
     @Override
     public synchronized void reportJobFailure(JobId jobId, List<Exception> 
exceptions) {
-        Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
-        Level logLevel = Level.DEBUG;
-        if (LOGGER.isEnabled(logLevel)) {
-            LOGGER.log(logLevel, "job " + jobId + " failed and is being 
reported to " + getClass().getSimpleName(), ex);
-        }
         ResultJobRecord rjr = getResultJobRecord(jobId);
+        if (logFailure(rjr)) {
+            LOGGER.debug("job {} failed and is being reported to {}", jobId, 
getClass().getSimpleName());
+        }
         if (rjr != null) {
             rjr.fail(exceptions);
         }
+        Exception ex = exceptions.isEmpty() ? null : exceptions.get(0);
         final JobResultInfo jobResultInfo = jobResultLocations.get(jobId);
         if (jobResultInfo != null) {
             jobResultInfo.setException(ex);
@@ -211,6 +207,15 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
         }
     }
 
+    private static boolean logFailure(ResultJobRecord rjr) {
+        if (rjr == null) {
+            return true;
+        }
+        // don't re-log if the state is already failed
+        ResultJobRecord.Status status = rjr.getStatus();
+        return status == null || status.getState() != State.FAILED;
+    }
+
     /**
      * Compares the records already known by the client for the given job's 
result set id with the records that the
      * result directory service knows and if there are any newly discovered 
records returns a whole array with the
@@ -264,7 +269,7 @@ public class ResultDirectoryService extends 
AbstractResultManager implements IRe
 
 class JobResultInfo {
 
-    private ResultJobRecord record;
+    private final ResultJobRecord record;
     private Waiters waiters;
     private Exception exception;
 
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
index 77d2f82f9c..6454804154 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/JobCleanupWork.java
@@ -34,11 +34,11 @@ import org.apache.logging.log4j.Logger;
 public class JobCleanupWork extends AbstractWork {
     private static final Logger LOGGER = LogManager.getLogger();
 
-    private IJobManager jobManager;
-    private JobId jobId;
-    private JobStatus status;
-    private List<Exception> exceptions;
-    private IResultCallback<Void> callback;
+    private final IJobManager jobManager;
+    private final JobId jobId;
+    private final JobStatus status;
+    private final List<Exception> exceptions;
+    private final IResultCallback<Void> callback;
 
     public JobCleanupWork(IJobManager jobManager, JobId jobId, JobStatus 
status, List<Exception> exceptions,
             IResultCallback<Void> callback) {
@@ -51,9 +51,6 @@ public class JobCleanupWork extends AbstractWork {
 
     @Override
     public void run() {
-        if (LOGGER.isInfoEnabled()) {
-            LOGGER.info("Cleanup for job: {}", jobId);
-        }
         final JobRun jobRun = jobManager.get(jobId);
         if (jobRun == null) {
             LOGGER.debug("Ignoring cleanup for unknown job: {}", jobId);
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
index 833066ea4a..33d391f22e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/TaskFailureWork.java
@@ -22,17 +22,13 @@ import java.util.List;
 
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
-import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.cc.job.IJobManager;
 import org.apache.hyracks.control.cc.job.JobRun;
 import org.apache.hyracks.control.cc.job.TaskAttempt;
-import org.apache.logging.log4j.Level;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
 
 public class TaskFailureWork extends AbstractTaskLifecycleWork {
-    private static final Logger LOGGER = LogManager.getLogger();
+
     private final List<Exception> exceptions;
 
     public TaskFailureWork(ClusterControllerService ccs, JobId jobId, 
TaskAttemptId taId, String nodeId,
@@ -43,9 +39,6 @@ public class TaskFailureWork extends 
AbstractTaskLifecycleWork {
 
     @Override
     protected void performEvent(TaskAttempt ta) {
-        Exception ex = exceptions.get(0);
-        LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : 
Level.WARN,
-                "Executing task failure work for " + this, ex);
         IJobManager jobManager = ccs.getJobManager();
         JobRun run = jobManager.get(jobId);
         if (run == null) {
diff --git 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
index b0c60aa3d3..6dd430720a 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-nc/src/main/java/org/apache/hyracks/control/nc/work/NotifyTaskFailureWork.java
@@ -23,7 +23,6 @@ import java.util.List;
 import org.apache.hyracks.api.dataflow.TaskAttemptId;
 import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.result.IResultPartitionManager;
-import org.apache.hyracks.api.util.ExceptionUtils;
 import org.apache.hyracks.control.common.work.AbstractWork;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.apache.hyracks.control.nc.Task;
@@ -50,9 +49,6 @@ public class NotifyTaskFailureWork extends AbstractWork {
 
     @Override
     public void run() {
-        Exception ex = exceptions.get(0);
-        LOGGER.log(ExceptionUtils.causedByInterrupt(ex) ? Level.DEBUG : 
Level.WARN, "task " + taskId + " has failed",
-                ex);
         try {
             IResultPartitionManager resultPartitionManager = 
ncs.getResultPartitionManager();
             if (resultPartitionManager != null) {

Reply via email to