This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit ca5a0b22a48d4378c212d1e5a3c0802d23ece7de Merge: 5b4e88e3d9 f124592d2b Author: Ali Alsuliman <[email protected]> AuthorDate: Sat Jun 8 06:35:58 2024 +0300 Merge branch 'gerrit/stabilization-40cfb8705b' into 'gerrit/neo' Change-Id: Ia8359d2d0d8e1b39b1f2c15e5f367332671f9df2 .../org/apache/asterix/active/ActiveManager.java | 3 +- .../optimizer/base/AsterixOptimizationContext.java | 13 +++-- .../app/active/ActiveEntityEventsListener.java | 22 ++++---- .../app/active/ActiveNotificationHandler.java | 64 ++++++---------------- .../app/function/ActiveRequestsDatasource.java | 5 ++ .../app/function/CompletedRequestsDatasource.java | 5 ++ .../asterix/app/message/CancelQueryRequest.java | 10 ++-- .../asterix/app/message/CancelQueryResponse.java | 2 +- .../misc/active_requests/active_requests.4.regex | 2 +- .../completed_requests/completed_requests.3.regex | 2 +- .../metadata/declared/FunctionDataSource.java | 4 ++ .../apache/hyracks/control/cc/job/JobManager.java | 10 ++-- .../control/cc/partitions/PartitionMatchMaker.java | 4 +- .../control/cc/work/ApplicationMessageWork.java | 8 +-- .../cc/work/GetNodeControllersInfoWork.java | 1 + .../cc/work/GetResultDirectoryAddressWork.java | 7 +++ .../cc/work/GetResultPartitionLocationsWork.java | 6 ++ .../hyracks/control/cc/work/JobCleanupWork.java | 10 +++- .../hyracks/control/cc/work/JobStartWork.java | 6 ++ .../cc/work/JobletCleanupNotificationWork.java | 7 +++ .../hyracks/control/cc/work/RegisterNodeWork.java | 6 ++ .../control/cc/work/RemoveDeadNodesWork.java | 4 +- .../hyracks/control/cc/work/TaskCompleteWork.java | 11 ++++ .../hyracks/control/cc/work/TaskFailureWork.java | 4 +- .../control/cc/work/WaitForJobCompletionWork.java | 6 ++ .../control/nc/work/ApplicationMessageWork.java | 9 +-- .../hyracks/control/nc/work/CleanupJobletWork.java | 9 ++- .../control/nc/work/NotifyTaskCompleteWork.java | 18 ++++-- .../hyracks/control/nc/work/StartTasksWork.java | 2 +- .../storage/am/lsm/common/util/ComponentUtils.java | 51 +++-------------- 30 files changed, 167 insertions(+), 144 deletions(-) diff --cc hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java index 8f9194453a,b5df59339b..6278693019 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java @@@ -145,16 -159,30 +145,16 @@@ public class PartitionMatchMaker } public void notifyNodeFailures(final Collection<String> deadNodes) { - removeEntries(partitionDescriptors, new IEntryFilter<PartitionDescriptor>() { - @Override - public boolean matches(PartitionDescriptor o) { - return deadNodes.contains(o.getNodeId()); - } - }); - removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() { - @Override - public boolean matches(PartitionRequest o) { - return deadNodes.contains(o.getNodeId()); - } - }); + removeEntries(partitionDescriptors, o -> deadNodes.contains(o.getNodeId())); + removeEntries(partitionRequests, o -> deadNodes.contains(o.getNodeId())); } - public void removeUncommittedPartitions(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) { + public void removeUncommittedPartitions(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) { - if (!partitionIds.isEmpty()) { + if (partitionIds != null && !partitionIds.isEmpty()) { - LOGGER.debug("Removing uncommitted partitions {}", partitionIds); + LOGGER.debug("Removing uncommitted partitions {}: {}", jobId, partitionIds); } - IEntryFilter<PartitionDescriptor> filter = new IEntryFilter<PartitionDescriptor>() { - @Override - public boolean matches(PartitionDescriptor o) { - return o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId()); - } - }; + IEntryFilter<PartitionDescriptor> filter = + o -> o.getState() != PartitionState.COMMITTED && taIds.contains(o.getProducingTaskAttemptId()); for (PartitionId pid : partitionIds) { List<PartitionDescriptor> descriptors = partitionDescriptors.get(pid); if (descriptors != null) { @@@ -166,11 -194,16 +166,11 @@@ } } - public void removePartitionRequests(Set<PartitionId> partitionIds, final Set<TaskAttemptId> taIds) { + public void removePartitionRequests(Set<PartitionId> partitionIds, Set<TaskAttemptId> taIds, JobId jobId) { - if (!partitionIds.isEmpty()) { + if (partitionIds != null && !partitionIds.isEmpty()) { - LOGGER.debug("Removing partition requests {}", partitionIds); + LOGGER.debug("Removing partition requests {}: {}", jobId, partitionIds); } - IEntryFilter<PartitionRequest> filter = new IEntryFilter<PartitionRequest>() { - @Override - public boolean matches(PartitionRequest o) { - return taIds.contains(o.getRequestingTaskAttemptId()); - } - }; + IEntryFilter<PartitionRequest> filter = o -> taIds.contains(o.getRequestingTaskAttemptId()); for (PartitionId pid : partitionIds) { List<PartitionRequest> requests = partitionRequests.get(pid); if (requests != null) {
