This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit ca5a0b22a48d4378c212d1e5a3c0802d23ece7de
Merge: 5b4e88e3d9 f124592d2b
Author: Ali Alsuliman <[email protected]>
AuthorDate: Sat Jun 8 06:35:58 2024 +0300

    Merge branch 'gerrit/stabilization-40cfb8705b' into 'gerrit/neo'
    
    Change-Id: Ia8359d2d0d8e1b39b1f2c15e5f367332671f9df2

 .../org/apache/asterix/active/ActiveManager.java   |  3 +-
 .../optimizer/base/AsterixOptimizationContext.java | 13 +++--
 .../app/active/ActiveEntityEventsListener.java     | 22 ++++----
 .../app/active/ActiveNotificationHandler.java      | 64 ++++++----------------
 .../app/function/ActiveRequestsDatasource.java     |  5 ++
 .../app/function/CompletedRequestsDatasource.java  |  5 ++
 .../asterix/app/message/CancelQueryRequest.java    | 10 ++--
 .../asterix/app/message/CancelQueryResponse.java   |  2 +-
 .../misc/active_requests/active_requests.4.regex   |  2 +-
 .../completed_requests/completed_requests.3.regex  |  2 +-
 .../metadata/declared/FunctionDataSource.java      |  4 ++
 .../apache/hyracks/control/cc/job/JobManager.java  | 10 ++--
 .../control/cc/partitions/PartitionMatchMaker.java |  4 +-
 .../control/cc/work/ApplicationMessageWork.java    |  8 +--
 .../cc/work/GetNodeControllersInfoWork.java        |  1 +
 .../cc/work/GetResultDirectoryAddressWork.java     |  7 +++
 .../cc/work/GetResultPartitionLocationsWork.java   |  6 ++
 .../hyracks/control/cc/work/JobCleanupWork.java    | 10 +++-
 .../hyracks/control/cc/work/JobStartWork.java      |  6 ++
 .../cc/work/JobletCleanupNotificationWork.java     |  7 +++
 .../hyracks/control/cc/work/RegisterNodeWork.java  |  6 ++
 .../control/cc/work/RemoveDeadNodesWork.java       |  4 +-
 .../hyracks/control/cc/work/TaskCompleteWork.java  | 11 ++++
 .../hyracks/control/cc/work/TaskFailureWork.java   |  4 +-
 .../control/cc/work/WaitForJobCompletionWork.java  |  6 ++
 .../control/nc/work/ApplicationMessageWork.java    |  9 +--
 .../hyracks/control/nc/work/CleanupJobletWork.java |  9 ++-
 .../control/nc/work/NotifyTaskCompleteWork.java    | 18 ++++--
 .../hyracks/control/nc/work/StartTasksWork.java    |  2 +-
 .../storage/am/lsm/common/util/ComponentUtils.java | 51 +++--------------
 30 files changed, 167 insertions(+), 144 deletions(-)

diff --cc 
hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
index 8f9194453a,b5df59339b..6278693019
--- 
a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/partitions/PartitionMatchMaker.java
@@@ -145,16 -159,30 +145,16 @@@ public class PartitionMatchMaker 
      }
  
      public void notifyNodeFailures(final Collection<String> deadNodes) {
 -        removeEntries(partitionDescriptors, new 
IEntryFilter<PartitionDescriptor>() {
 -            @Override
 -            public boolean matches(PartitionDescriptor o) {
 -                return deadNodes.contains(o.getNodeId());
 -            }
 -        });
 -        removeEntries(partitionRequests, new IEntryFilter<PartitionRequest>() 
{
 -            @Override
 -            public boolean matches(PartitionRequest o) {
 -                return deadNodes.contains(o.getNodeId());
 -            }
 -        });
 +        removeEntries(partitionDescriptors, o -> 
deadNodes.contains(o.getNodeId()));
 +        removeEntries(partitionRequests, o -> 
deadNodes.contains(o.getNodeId()));
      }
  
 -    public void removeUncommittedPartitions(Set<PartitionId> partitionIds, 
final Set<TaskAttemptId> taIds) {
 +    public void removeUncommittedPartitions(Set<PartitionId> partitionIds, 
Set<TaskAttemptId> taIds, JobId jobId) {
-         if (!partitionIds.isEmpty()) {
+         if (partitionIds != null && !partitionIds.isEmpty()) {
 -            LOGGER.debug("Removing uncommitted partitions {}", partitionIds);
 +            LOGGER.debug("Removing uncommitted partitions {}: {}", jobId, 
partitionIds);
          }
 -        IEntryFilter<PartitionDescriptor> filter = new 
IEntryFilter<PartitionDescriptor>() {
 -            @Override
 -            public boolean matches(PartitionDescriptor o) {
 -                return o.getState() != PartitionState.COMMITTED && 
taIds.contains(o.getProducingTaskAttemptId());
 -            }
 -        };
 +        IEntryFilter<PartitionDescriptor> filter =
 +                o -> o.getState() != PartitionState.COMMITTED && 
taIds.contains(o.getProducingTaskAttemptId());
          for (PartitionId pid : partitionIds) {
              List<PartitionDescriptor> descriptors = 
partitionDescriptors.get(pid);
              if (descriptors != null) {
@@@ -166,11 -194,16 +166,11 @@@
          }
      }
  
 -    public void removePartitionRequests(Set<PartitionId> partitionIds, final 
Set<TaskAttemptId> taIds) {
 +    public void removePartitionRequests(Set<PartitionId> partitionIds, 
Set<TaskAttemptId> taIds, JobId jobId) {
-         if (!partitionIds.isEmpty()) {
+         if (partitionIds != null && !partitionIds.isEmpty()) {
 -            LOGGER.debug("Removing partition requests {}", partitionIds);
 +            LOGGER.debug("Removing partition requests {}: {}", jobId, 
partitionIds);
          }
 -        IEntryFilter<PartitionRequest> filter = new 
IEntryFilter<PartitionRequest>() {
 -            @Override
 -            public boolean matches(PartitionRequest o) {
 -                return taIds.contains(o.getRequestingTaskAttemptId());
 -            }
 -        };
 +        IEntryFilter<PartitionRequest> filter = o -> 
taIds.contains(o.getRequestingTaskAttemptId());
          for (PartitionId pid : partitionIds) {
              List<PartitionRequest> requests = partitionRequests.get(pid);
              if (requests != null) {

Reply via email to