This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f7a549123b Commit segments only when they are covered by active locks 
(#15027)
f7a549123b is described below

commit f7a549123b9783cf24c315879781f01dbfee55a0
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Sep 25 13:45:42 2023 +0530

    Commit segments only when they are covered by active locks (#15027)
    
    * Commit segments only when they are covered by active locks
---
 .../druid/indexing/common/actions/TaskLocks.java      | 13 ++++++++++++-
 .../apache/druid/indexing/overlord/TaskLockbox.java   |  2 +-
 .../druid/indexing/common/actions/TaskLocksTest.java  | 19 ++++++++++++++++++-
 3 files changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
index 2f42ebb21c..bb83599780 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
@@ -46,6 +46,7 @@ import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 public class TaskLocks
 {
@@ -93,6 +94,12 @@ public class TaskLocks
            : DateTimes.nowUtc().toString();
   }
 
+  /**
+   * Checks if the segments are covered by a non revoked lock
+   * @param taskLockMap task locks for a task
+   * @param segments segments to be checked
+   * @return true if each of the segments is covered by a non-revoked lock
+   */
   public static boolean isLockCoversSegments(
       NavigableMap<DateTime, List<TaskLock>> taskLockMap,
       Collection<DataSegment> segments
@@ -105,7 +112,11 @@ public class TaskLocks
             return false;
           }
 
-          final List<TaskLock> locks = entry.getValue();
+          // taskLockMap may contain revoked locks which need to be filtered
+          final List<TaskLock> locks = entry.getValue()
+                                            .stream()
+                                            .filter(lock -> !lock.isRevoked())
+                                            .collect(Collectors.toList());
           return locks.stream().anyMatch(
               lock -> {
                 if (lock.getGranularity() == LockGranularity.TIME_CHUNK) {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index aaa563fa4e..761c0b5916 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -832,7 +832,7 @@ public class TaskLockbox
    * @param lock   lock to be revoked
    */
   @VisibleForTesting
-  protected void revokeLock(String taskId, TaskLock lock)
+  public void revokeLock(String taskId, TaskLock lock)
   {
     giant.lock();
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
index 43a403e59a..e5d1d2882e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.actions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.SegmentLock;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.TaskLockType;
@@ -34,6 +35,7 @@ import 
org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
 import org.apache.druid.indexing.overlord.LockResult;
 import org.apache.druid.indexing.overlord.SpecificSegmentLockRequest;
 import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
 import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
 import org.apache.druid.java.util.common.DateTimes;
@@ -62,11 +64,13 @@ public class TaskLocksTest
   @Before
   public void setup()
   {
+    final TaskStorage taskStorage = new HeapMemoryTaskStorage(new 
TaskStorageConfig(null));
     lockbox = new TaskLockbox(
-        new HeapMemoryTaskStorage(new TaskStorageConfig(null)),
+        taskStorage,
         new TestIndexerMetadataStorageCoordinator()
     );
     task = NoopTask.create();
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
     lockbox.add(task);
   }
 
@@ -296,6 +300,19 @@ public class TaskLocksTest
     );
   }
 
+  @Test
+  public void testRevokedLocksDoNotCoverSegments()
+  {
+    final Set<DataSegment> segments = createNumberedPartitionedSegments();
+    final Interval interval = Intervals.of("2017-01-01/2017-01-02");
+
+    final TaskLock lock = tryTimeChunkLock(task, interval, 
TaskLockType.EXCLUSIVE);
+    Assert.assertTrue(TaskLocks.isLockCoversSegments(task, lockbox, segments));
+
+    lockbox.revokeLock(task.getId(), lock);
+    Assert.assertFalse(TaskLocks.isLockCoversSegments(task, lockbox, 
segments));
+  }
+
   @Test
   public void testFindReplaceLocksCoveringSegments()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to