This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f7a549123b Commit segments only when they are covered by active locks
(#15027)
f7a549123b is described below
commit f7a549123b9783cf24c315879781f01dbfee55a0
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Sep 25 13:45:42 2023 +0530
Commit segments only when they are covered by active locks (#15027)
* Commit segments only when they are covered by active locks
---
.../druid/indexing/common/actions/TaskLocks.java | 13 ++++++++++++-
.../apache/druid/indexing/overlord/TaskLockbox.java | 2 +-
.../druid/indexing/common/actions/TaskLocksTest.java | 19 ++++++++++++++++++-
3 files changed, 31 insertions(+), 3 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
index 2f42ebb21c..bb83599780 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/TaskLocks.java
@@ -46,6 +46,7 @@ import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
+import java.util.stream.Collectors;
public class TaskLocks
{
@@ -93,6 +94,12 @@ public class TaskLocks
: DateTimes.nowUtc().toString();
}
+ /**
+ * Checks if the segments are covered by a non revoked lock
+ * @param taskLockMap task locks for a task
+ * @param segments segments to be checked
+ * @return true if each of the segments is covered by a non-revoked lock
+ */
public static boolean isLockCoversSegments(
NavigableMap<DateTime, List<TaskLock>> taskLockMap,
Collection<DataSegment> segments
@@ -105,7 +112,11 @@ public class TaskLocks
return false;
}
- final List<TaskLock> locks = entry.getValue();
+ // taskLockMap may contain revoked locks which need to be filtered
+ final List<TaskLock> locks = entry.getValue()
+ .stream()
+ .filter(lock -> !lock.isRevoked())
+ .collect(Collectors.toList());
return locks.stream().anyMatch(
lock -> {
if (lock.getGranularity() == LockGranularity.TIME_CHUNK) {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index aaa563fa4e..761c0b5916 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -832,7 +832,7 @@ public class TaskLockbox
* @param lock lock to be revoked
*/
@VisibleForTesting
- protected void revokeLock(String taskId, TaskLock lock)
+ public void revokeLock(String taskId, TaskLock lock)
{
giant.lock();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
index 43a403e59a..e5d1d2882e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/TaskLocksTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.common.actions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.SegmentLock;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
@@ -34,6 +35,7 @@ import
org.apache.druid.indexing.overlord.HeapMemoryTaskStorage;
import org.apache.druid.indexing.overlord.LockResult;
import org.apache.druid.indexing.overlord.SpecificSegmentLockRequest;
import org.apache.druid.indexing.overlord.TaskLockbox;
+import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.TimeChunkLockRequest;
import org.apache.druid.indexing.test.TestIndexerMetadataStorageCoordinator;
import org.apache.druid.java.util.common.DateTimes;
@@ -62,11 +64,13 @@ public class TaskLocksTest
@Before
public void setup()
{
+ final TaskStorage taskStorage = new HeapMemoryTaskStorage(new
TaskStorageConfig(null));
lockbox = new TaskLockbox(
- new HeapMemoryTaskStorage(new TaskStorageConfig(null)),
+ taskStorage,
new TestIndexerMetadataStorageCoordinator()
);
task = NoopTask.create();
+ taskStorage.insert(task, TaskStatus.running(task.getId()));
lockbox.add(task);
}
@@ -296,6 +300,19 @@ public class TaskLocksTest
);
}
+ @Test
+ public void testRevokedLocksDoNotCoverSegments()
+ {
+ final Set<DataSegment> segments = createNumberedPartitionedSegments();
+ final Interval interval = Intervals.of("2017-01-01/2017-01-02");
+
+ final TaskLock lock = tryTimeChunkLock(task, interval,
TaskLockType.EXCLUSIVE);
+ Assert.assertTrue(TaskLocks.isLockCoversSegments(task, lockbox, segments));
+
+ lockbox.revokeLock(task.getId(), lock);
+ Assert.assertFalse(TaskLocks.isLockCoversSegments(task, lockbox,
segments));
+ }
+
@Test
public void testFindReplaceLocksCoveringSegments()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]