This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b847512332 Remove unused task action SegmentLockReleaseAction (#16422)
3b847512332 is described below

commit 3b847512332dc1608f9ca13459fc24d97d58d050
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri May 10 06:38:29 2024 +0530

    Remove unused task action SegmentLockReleaseAction (#16422)
    
    Changes:
    - Remove `SegmentLockReleaseAction` as it is not used anywhere.
    It is not even registered as a known sub-type of `TaskAction`.
    - Minor refactor in `TaskLockbox`. No functional change.
    - Remove `ExpectedException` from `TaskLockboxTest`
---
 .../common/actions/SegmentLockReleaseAction.java   |  85 --------
 .../druid/indexing/overlord/TaskLockbox.java       | 232 ++++++++-------------
 .../druid/indexing/overlord/TaskLockboxTest.java   |  47 ++---
 3 files changed, 107 insertions(+), 257 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java
deleted file mode 100644
index 6a47091710b..00000000000
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.actions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.druid.indexing.common.task.Task;
-import org.joda.time.Interval;
-
-/**
- * TaskAction to release a {@link 
org.apache.druid.indexing.common.SegmentLock}.
- * Used by batch tasks when they fail to acquire all necessary locks.
- */
-public class SegmentLockReleaseAction implements TaskAction<Void>
-{
-  private final Interval interval;
-  private final int partitionId;
-
-  @JsonCreator
-  public SegmentLockReleaseAction(@JsonProperty Interval interval, 
@JsonProperty int partitionId)
-  {
-    this.interval = interval;
-    this.partitionId = partitionId;
-  }
-
-  @JsonProperty
-  public Interval getInterval()
-  {
-    return interval;
-  }
-
-  @JsonProperty
-  public int getPartitionId()
-  {
-    return partitionId;
-  }
-
-  @Override
-  public TypeReference<Void> getReturnTypeReference()
-  {
-    return new TypeReference<Void>()
-    {
-    };
-  }
-
-  @Override
-  public Void perform(Task task, TaskActionToolbox toolbox)
-  {
-    toolbox.getTaskLockbox().unlock(task, interval, partitionId);
-    return null;
-  }
-
-  @Override
-  public boolean isAudited()
-  {
-    return false;
-  }
-
-  @Override
-  public String toString()
-  {
-    return "SegmentLockReleaseAction{" +
-           "interval=" + interval +
-           ", partitionId=" + partitionId +
-           '}';
-  }
-}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 1155592f325..7776663330b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -24,7 +24,6 @@ import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Ordering;
@@ -63,13 +62,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Optional;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 /**
@@ -766,30 +765,12 @@ public class TaskLockbox
     giant.lock();
 
     try {
-      return action.perform(isTaskLocksValid(task, intervals));
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  /**
-   * Check all locks task acquired are still valid.
-   * It doesn't check other semantics like acquired locks are enough to 
overwrite existing segments.
-   * This kind of semantic should be checked in each caller of {@link 
#doInCriticalSection}.
-   */
-  private boolean isTaskLocksValid(Task task, Set<Interval> intervals)
-  {
-    giant.lock();
-    try {
-      return intervals
-          .stream()
-          .allMatch(interval -> {
-            final List<TaskLockPosse> lockPosses = 
getOnlyTaskLockPosseContainingInterval(task, interval);
-            return 
lockPosses.stream().map(TaskLockPosse::getTaskLock).noneMatch(
-                TaskLock::isRevoked
-            );
-          });
+      // Check if any of the locks held by this task have been revoked
+      final boolean areTaskLocksValid = intervals.stream().noneMatch(interval 
-> {
+        Optional<TaskLockPosse> lockPosse = 
getOnlyTaskLockPosseContainingInterval(task, interval);
+        return lockPosse.isPresent() && 
lockPosse.get().getTaskLock().isRevoked();
+      });
+      return action.perform(areTaskLocksValid);
     }
     finally {
       giant.unlock();
@@ -801,7 +782,7 @@ public class TaskLockbox
     giant.lock();
 
     try {
-      lockPosse.forEachTask(taskId -> revokeLock(taskId, 
lockPosse.getTaskLock()));
+      lockPosse.taskIds.forEach(taskId -> revokeLock(taskId, 
lockPosse.getTaskLock()));
     }
     finally {
       giant.unlock();
@@ -1083,22 +1064,20 @@ public class TaskLockbox
    * @param task     task to unlock
    * @param interval interval to unlock
    */
-  public void unlock(final Task task, final Interval interval, @Nullable 
Integer partitionId)
+  private void unlock(final Task task, final Interval interval, @Nullable 
Integer partitionId)
   {
     giant.lock();
 
     try {
       final String dataSource = task.getDataSource();
-      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> 
dsRunning = running.get(
-          task.getDataSource()
-      );
-
-      if (dsRunning == null || dsRunning.isEmpty()) {
+      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> 
locksForDatasource
+          = running.get(task.getDataSource());
+      if (locksForDatasource == null || locksForDatasource.isEmpty()) {
         return;
       }
 
-      final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses = 
dsRunning.get(interval.getStart());
-
+      final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses
+          = locksForDatasource.get(interval.getStart());
       if (intervalToPosses == null || intervalToPosses.isEmpty()) {
         return;
       }
@@ -1126,18 +1105,15 @@ public class TaskLockbox
           final boolean removed = taskLockPosse.removeTask(task);
 
           if (taskLockPosse.isTasksEmpty()) {
-            log.info("TaskLock is now empty: %s", taskLock);
+            log.info("TaskLock[%s] is now empty.", taskLock);
             possesHolder.remove(taskLockPosse);
           }
-
           if (possesHolder.isEmpty()) {
             intervalToPosses.remove(interval);
           }
-
           if (intervalToPosses.isEmpty()) {
-            dsRunning.remove(interval.getStart());
+            locksForDatasource.remove(interval.getStart());
           }
-
           if (running.get(dataSource).isEmpty()) {
             running.remove(dataSource);
           }
@@ -1227,39 +1203,7 @@ public class TaskLockbox
     try {
       try {
         log.info("Removing task[%s] from activeTasks", task.getId());
-        try {
-          // Clean upgrade segments table for entries associated with 
replacing task
-          if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() 
== TaskLockType.REPLACE)) {
-            final int upgradeSegmentsDeleted = 
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
-            log.info(
-                "Deleted [%d] entries from upgradeSegments table for task[%s] 
with REPLACE locks.",
-                upgradeSegmentsDeleted, task.getId()
-            );
-          }
-          // Clean pending segments associated with the appending task
-          if (task instanceof PendingSegmentAllocatingTask) {
-            final String taskAllocatorId = ((PendingSegmentAllocatingTask) 
task).getTaskAllocatorId();
-            if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
-              final Set<String> idsInSameGroup = 
activeAllocatorIdToTaskIds.get(taskAllocatorId);
-              idsInSameGroup.remove(task.getId());
-              if (idsInSameGroup.isEmpty()) {
-                final int pendingSegmentsDeleted
-                    = 
metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(
-                        task.getDataSource(),
-                        taskAllocatorId
-                );
-                log.info(
-                    "Deleted [%d] entries from pendingSegments table for 
pending segments group [%s] with APPEND locks.",
-                    pendingSegmentsDeleted, taskAllocatorId
-                );
-              }
-              activeAllocatorIdToTaskIds.remove(taskAllocatorId);
-            }
-          }
-        }
-        catch (Exception e) {
-          log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments 
tables.");
-        }
+        cleanupUpgradeAndPendingSegments(task);
         unlockAll(task);
       }
       finally {
@@ -1271,33 +1215,63 @@ public class TaskLockbox
     }
   }
 
-  /**
-   * Return the currently-active lock posses for some task.
-   *
-   * @param task task for which to locate locks
-   */
-  private List<TaskLockPosse> findLockPossesForTask(final Task task)
+  @GuardedBy("giant")
+  private void cleanupUpgradeAndPendingSegments(Task task)
   {
-    giant.lock();
-
     try {
-      // Scan through all locks for this datasource
-      final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> 
dsRunning = running.get(task.getDataSource());
-      if (dsRunning == null) {
-        return ImmutableList.of();
-      } else {
-        return dsRunning.values().stream()
-                        .flatMap(map -> map.values().stream())
-                        .flatMap(Collection::stream)
-                        .filter(taskLockPosse -> 
taskLockPosse.containsTask(task))
-                        .collect(Collectors.toList());
+      // Clean up upgrade segment entries associated with a REPLACE task
+      if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == 
TaskLockType.REPLACE)) {
+        final int upgradeSegmentsDeleted = 
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+        log.info(
+            "Deleted [%d] entries from upgradeSegments table for task[%s] with 
REPLACE locks.",
+            upgradeSegmentsDeleted, task.getId()
+        );
+      }
+
+      // Clean up pending segments associated with an APPEND task
+      if (task instanceof PendingSegmentAllocatingTask) {
+        final String taskAllocatorId = ((PendingSegmentAllocatingTask) 
task).getTaskAllocatorId();
+        if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
+          final Set<String> taskIdsForSameAllocator = 
activeAllocatorIdToTaskIds.get(taskAllocatorId);
+          taskIdsForSameAllocator.remove(task.getId());
+
+          if (taskIdsForSameAllocator.isEmpty()) {
+            final int pendingSegmentsDeleted = metadataStorageCoordinator
+                .deletePendingSegmentsForTaskAllocatorId(task.getDataSource(), 
taskAllocatorId);
+            log.info(
+                "Deleted [%d] entries from pendingSegments table for 
taskAllocatorId[%s].",
+                pendingSegmentsDeleted, taskAllocatorId
+            );
+          }
+          activeAllocatorIdToTaskIds.remove(taskAllocatorId);
+        }
       }
     }
-    finally {
-      giant.unlock();
+    catch (Exception e) {
+      log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments 
tables.");
     }
   }
 
+  /**
+   * Finds all the lock posses for the given task.
+   */
+  @GuardedBy("giant")
+  private List<TaskLockPosse> findLockPossesForTask(final Task task)
+  {
+    // Scan through all locks for this datasource
+    final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>> 
locksForDatasource
+        = running.get(task.getDataSource());
+    if (locksForDatasource == null) {
+      return Collections.emptyList();
+    }
+
+    return locksForDatasource.values().stream()
+                             .flatMap(map -> map.values().stream())
+                             .flatMap(Collection::stream)
+                             .filter(taskLockPosse -> 
taskLockPosse.containsTask(task))
+                             .collect(Collectors.toList());
+  }
+
   private List<TaskLockPosse> findLockPossesContainingInterval(final String 
dataSource, final Interval interval)
   {
     giant.lock();
@@ -1342,19 +1316,7 @@ public class TaskLockbox
   }
 
   @VisibleForTesting
-  List<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task, 
Interval interval)
-  {
-    giant.lock();
-    try {
-      return getOnlyTaskLockPosseContainingInterval(task, interval, 
Collections.emptySet());
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  @VisibleForTesting
-  List<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task, 
Interval interval, Set<Integer> partitionIds)
+  Optional<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task, 
Interval interval)
   {
     giant.lock();
     try {
@@ -1364,35 +1326,20 @@ public class TaskLockbox
           .collect(Collectors.toList());
 
       if (filteredPosses.isEmpty()) {
-        throw new ISE("Cannot find locks for task[%s] and interval[%s]", 
task.getId(), interval);
-      } else if (filteredPosses.size() > 1) {
-        if (filteredPosses.stream()
-                          .anyMatch(posse -> 
posse.getTaskLock().getGranularity() == LockGranularity.TIME_CHUNK)) {
-          throw new ISE(
-              "There are multiple timeChunk lockPosses for task[%s] and 
interval[%s]?",
-              task.getId(),
-              interval
-          );
-        } else {
-          final Map<Integer, TaskLockPosse> partitionIdsOfLocks = new 
HashMap<>();
-          for (TaskLockPosse posse : filteredPosses) {
-            final SegmentLock segmentLock = (SegmentLock) posse.getTaskLock();
-            partitionIdsOfLocks.put(segmentLock.getPartitionId(), posse);
-          }
-
-          if 
(partitionIds.stream().allMatch(partitionIdsOfLocks::containsKey)) {
-            return 
partitionIds.stream().map(partitionIdsOfLocks::get).collect(Collectors.toList());
-          } else {
-            throw new ISE(
-                "Task[%s] doesn't have locks for interval[%s] partitions[%]",
-                task.getId(),
-                interval,
-                partitionIds.stream().filter(pid -> 
!partitionIdsOfLocks.containsKey(pid)).collect(Collectors.toList())
-            );
-          }
-        }
+        throw new ISE("Cannot find any lock for task[%s] and interval[%s]", 
task.getId(), interval);
+      } else if (filteredPosses.size() == 1) {
+        return Optional.of(filteredPosses.get(0));
+      } else if (
+          filteredPosses.stream().anyMatch(
+              posse -> posse.taskLock.getGranularity() == 
LockGranularity.TIME_CHUNK
+          )
+      ) {
+        throw new ISE(
+            "There are multiple timechunk lockPosses for task[%s] and 
interval[%s]",
+            task.getId(), interval
+        );
       } else {
-        return filteredPosses;
+        return Optional.empty();
       }
     }
     finally {
@@ -1635,17 +1582,13 @@ public class TaskLockbox
         Preconditions.checkArgument(
             taskLock.getGroupId().equals(task.getGroupId()),
             "groupId[%s] of task[%s] is different from the existing 
lockPosse's groupId[%s]",
-            task.getGroupId(),
-            task.getId(),
-            taskLock.getGroupId()
+            task.getGroupId(), task.getId(), taskLock.getGroupId()
         );
       }
       Preconditions.checkArgument(
           taskLock.getNonNullPriority() == task.getPriority(),
           "priority[%s] of task[%s] is different from the existing lockPosse's 
priority[%s]",
-          task.getPriority(),
-          task.getId(),
-          taskLock.getNonNullPriority()
+          task.getPriority(), task.getId(), taskLock.getNonNullPriority()
       );
       return taskIds.add(task.getId());
     }
@@ -1724,12 +1667,6 @@ public class TaskLockbox
       return false;
     }
 
-    void forEachTask(Consumer<String> action)
-    {
-      Preconditions.checkNotNull(action, "action");
-      taskIds.forEach(action);
-    }
-
     @Override
     public boolean equals(Object o)
     {
@@ -1801,8 +1738,7 @@ public class TaskLockbox
   /**
    * Contains the task, request, lock and final result for a segment 
allocation.
    */
-  @VisibleForTesting
-  static class SegmentAllocationHolder
+  private static class SegmentAllocationHolder
   {
     final AllocationHolderList list;
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index e6b20dd073e..a02b5108767 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -73,7 +73,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -83,6 +82,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -91,9 +91,6 @@ public class TaskLockboxTest
   @Rule
   public final TestDerbyConnector.DerbyConnectorRule derby = new 
TestDerbyConnector.DerbyConnectorRule();
 
-  @Rule
-  public ExpectedException expectedException = ExpectedException.none();
-
   private ObjectMapper objectMapper;
   private TaskStorage taskStorage;
   private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
@@ -105,9 +102,6 @@ public class TaskLockboxTest
   private final int MEDIUM_PRIORITY = 10;
   private final int LOW_PRIORITY = 5;
 
-  @Rule
-  public final ExpectedException exception = ExpectedException.none();
-
   @Before
   public void setup()
   {
@@ -186,14 +180,16 @@ public class TaskLockboxTest
   }
 
   @Test
-  public void testLockAfterTaskComplete() throws InterruptedException
+  public void testLockAfterTaskComplete()
   {
-    Task task = NoopTask.create();
-    exception.expect(ISE.class);
-    exception.expectMessage("Unable to grant lock to inactive Task");
+    final Task task = NoopTask.create();
     lockbox.add(task);
     lockbox.remove(task);
-    acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, 
Intervals.of("2015-01-01/2015-01-02"));
+    ISE exception = Assert.assertThrows(
+        ISE.class,
+        () -> acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task, 
Intervals.of("2015-01-01/2015-01-02"))
+    );
+    Assert.assertTrue(exception.getMessage().contains("Unable to grant lock to 
inactive Task"));
   }
 
   @Test
@@ -311,12 +307,15 @@ public class TaskLockboxTest
   @Test
   public void testTryLockAfterTaskComplete()
   {
-    Task task = NoopTask.create();
-    exception.expect(ISE.class);
-    exception.expectMessage("Unable to grant lock to inactive Task");
+    final Task task = NoopTask.create();
     lockbox.add(task);
     lockbox.remove(task);
-    Assert.assertFalse(tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, 
Intervals.of("2015-01-01/2015-01-02")).isOk());
+
+    ISE exception = Assert.assertThrows(
+        ISE.class,
+        () -> tryTimeChunkLock(TaskLockType.EXCLUSIVE, task, 
Intervals.of("2015-01-01/2015-01-02"))
+    );
+    Assert.assertTrue(exception.getMessage().contains("Unable to grant lock to 
inactive Task"));
   }
 
   @Test
@@ -759,23 +758,23 @@ public class TaskLockboxTest
         ).isOk()
     );
 
-    final List<TaskLockPosse> highLockPosses = 
lockbox.getOnlyTaskLockPosseContainingInterval(
+    final Optional<TaskLockPosse> highLockPosse = 
lockbox.getOnlyTaskLockPosseContainingInterval(
         highPriorityTask,
         Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
     );
 
-    Assert.assertEquals(1, highLockPosses.size());
-    Assert.assertTrue(highLockPosses.get(0).containsTask(highPriorityTask));
-    Assert.assertFalse(highLockPosses.get(0).getTaskLock().isRevoked());
+    Assert.assertTrue(highLockPosse.isPresent());
+    Assert.assertTrue(highLockPosse.get().containsTask(highPriorityTask));
+    Assert.assertFalse(highLockPosse.get().getTaskLock().isRevoked());
 
-    final List<TaskLockPosse> lowLockPosses = 
lockbox.getOnlyTaskLockPosseContainingInterval(
+    final Optional<TaskLockPosse> lowLockPosse = 
lockbox.getOnlyTaskLockPosseContainingInterval(
         lowPriorityTask,
         Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
     );
 
-    Assert.assertEquals(1, lowLockPosses.size());
-    Assert.assertTrue(lowLockPosses.get(0).containsTask(lowPriorityTask));
-    Assert.assertTrue(lowLockPosses.get(0).getTaskLock().isRevoked());
+    Assert.assertTrue(lowLockPosse.isPresent());
+    Assert.assertTrue(lowLockPosse.get().containsTask(lowPriorityTask));
+    Assert.assertTrue(lowLockPosse.get().getTaskLock().isRevoked());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to