This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3b847512332 Remove unused task action SegmentLockReleaseAction (#16422)
3b847512332 is described below
commit 3b847512332dc1608f9ca13459fc24d97d58d050
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri May 10 06:38:29 2024 +0530
Remove unused task action SegmentLockReleaseAction (#16422)
Changes:
- Remove `SegmentLockReleaseAction` as it is not used anywhere.
It is not even registered as a known sub-type of `TaskAction`.
- Minor refactor in `TaskLockbox`. No functional change.
- Remove `ExpectedException` from `TaskLockboxTest`
---
.../common/actions/SegmentLockReleaseAction.java | 85 --------
.../druid/indexing/overlord/TaskLockbox.java | 232 ++++++++-------------
.../druid/indexing/overlord/TaskLockboxTest.java | 47 ++---
3 files changed, 107 insertions(+), 257 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java
deleted file mode 100644
index 6a47091710b..00000000000
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/actions/SegmentLockReleaseAction.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.actions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.druid.indexing.common.task.Task;
-import org.joda.time.Interval;
-
-/**
- * TaskAction to release a {@link
org.apache.druid.indexing.common.SegmentLock}.
- * Used by batch tasks when they fail to acquire all necessary locks.
- */
-public class SegmentLockReleaseAction implements TaskAction<Void>
-{
- private final Interval interval;
- private final int partitionId;
-
- @JsonCreator
- public SegmentLockReleaseAction(@JsonProperty Interval interval,
@JsonProperty int partitionId)
- {
- this.interval = interval;
- this.partitionId = partitionId;
- }
-
- @JsonProperty
- public Interval getInterval()
- {
- return interval;
- }
-
- @JsonProperty
- public int getPartitionId()
- {
- return partitionId;
- }
-
- @Override
- public TypeReference<Void> getReturnTypeReference()
- {
- return new TypeReference<Void>()
- {
- };
- }
-
- @Override
- public Void perform(Task task, TaskActionToolbox toolbox)
- {
- toolbox.getTaskLockbox().unlock(task, interval, partitionId);
- return null;
- }
-
- @Override
- public boolean isAudited()
- {
- return false;
- }
-
- @Override
- public String toString()
- {
- return "SegmentLockReleaseAction{" +
- "interval=" + interval +
- ", partitionId=" + partitionId +
- '}';
- }
-}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 1155592f325..7776663330b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -24,7 +24,6 @@ import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
@@ -63,13 +62,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
import java.util.stream.Collectors;
/**
@@ -766,30 +765,12 @@ public class TaskLockbox
giant.lock();
try {
- return action.perform(isTaskLocksValid(task, intervals));
- }
- finally {
- giant.unlock();
- }
- }
-
- /**
- * Check all locks task acquired are still valid.
- * It doesn't check other semantics like acquired locks are enough to
overwrite existing segments.
- * This kind of semantic should be checked in each caller of {@link
#doInCriticalSection}.
- */
- private boolean isTaskLocksValid(Task task, Set<Interval> intervals)
- {
- giant.lock();
- try {
- return intervals
- .stream()
- .allMatch(interval -> {
- final List<TaskLockPosse> lockPosses =
getOnlyTaskLockPosseContainingInterval(task, interval);
- return
lockPosses.stream().map(TaskLockPosse::getTaskLock).noneMatch(
- TaskLock::isRevoked
- );
- });
+ // Check if any of the locks held by this task have been revoked
+ final boolean areTaskLocksValid = intervals.stream().noneMatch(interval
-> {
+ Optional<TaskLockPosse> lockPosse =
getOnlyTaskLockPosseContainingInterval(task, interval);
+ return lockPosse.isPresent() &&
lockPosse.get().getTaskLock().isRevoked();
+ });
+ return action.perform(areTaskLocksValid);
}
finally {
giant.unlock();
@@ -801,7 +782,7 @@ public class TaskLockbox
giant.lock();
try {
- lockPosse.forEachTask(taskId -> revokeLock(taskId,
lockPosse.getTaskLock()));
+ lockPosse.taskIds.forEach(taskId -> revokeLock(taskId,
lockPosse.getTaskLock()));
}
finally {
giant.unlock();
@@ -1083,22 +1064,20 @@ public class TaskLockbox
* @param task task to unlock
* @param interval interval to unlock
*/
- public void unlock(final Task task, final Interval interval, @Nullable
Integer partitionId)
+ private void unlock(final Task task, final Interval interval, @Nullable
Integer partitionId)
{
giant.lock();
try {
final String dataSource = task.getDataSource();
- final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>
dsRunning = running.get(
- task.getDataSource()
- );
-
- if (dsRunning == null || dsRunning.isEmpty()) {
+ final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>
locksForDatasource
+ = running.get(task.getDataSource());
+ if (locksForDatasource == null || locksForDatasource.isEmpty()) {
return;
}
- final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses =
dsRunning.get(interval.getStart());
-
+ final SortedMap<Interval, List<TaskLockPosse>> intervalToPosses
+ = locksForDatasource.get(interval.getStart());
if (intervalToPosses == null || intervalToPosses.isEmpty()) {
return;
}
@@ -1126,18 +1105,15 @@ public class TaskLockbox
final boolean removed = taskLockPosse.removeTask(task);
if (taskLockPosse.isTasksEmpty()) {
- log.info("TaskLock is now empty: %s", taskLock);
+ log.info("TaskLock[%s] is now empty.", taskLock);
possesHolder.remove(taskLockPosse);
}
-
if (possesHolder.isEmpty()) {
intervalToPosses.remove(interval);
}
-
if (intervalToPosses.isEmpty()) {
- dsRunning.remove(interval.getStart());
+ locksForDatasource.remove(interval.getStart());
}
-
if (running.get(dataSource).isEmpty()) {
running.remove(dataSource);
}
@@ -1227,39 +1203,7 @@ public class TaskLockbox
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
- try {
- // Clean upgrade segments table for entries associated with
replacing task
- if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType()
== TaskLockType.REPLACE)) {
- final int upgradeSegmentsDeleted =
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
- log.info(
- "Deleted [%d] entries from upgradeSegments table for task[%s]
with REPLACE locks.",
- upgradeSegmentsDeleted, task.getId()
- );
- }
- // Clean pending segments associated with the appending task
- if (task instanceof PendingSegmentAllocatingTask) {
- final String taskAllocatorId = ((PendingSegmentAllocatingTask)
task).getTaskAllocatorId();
- if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
- final Set<String> idsInSameGroup =
activeAllocatorIdToTaskIds.get(taskAllocatorId);
- idsInSameGroup.remove(task.getId());
- if (idsInSameGroup.isEmpty()) {
- final int pendingSegmentsDeleted
- =
metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(
- task.getDataSource(),
- taskAllocatorId
- );
- log.info(
- "Deleted [%d] entries from pendingSegments table for
pending segments group [%s] with APPEND locks.",
- pendingSegmentsDeleted, taskAllocatorId
- );
- }
- activeAllocatorIdToTaskIds.remove(taskAllocatorId);
- }
- }
- }
- catch (Exception e) {
- log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments
tables.");
- }
+ cleanupUpgradeAndPendingSegments(task);
unlockAll(task);
}
finally {
@@ -1271,33 +1215,63 @@ public class TaskLockbox
}
}
- /**
- * Return the currently-active lock posses for some task.
- *
- * @param task task for which to locate locks
- */
- private List<TaskLockPosse> findLockPossesForTask(final Task task)
+ @GuardedBy("giant")
+ private void cleanupUpgradeAndPendingSegments(Task task)
{
- giant.lock();
-
try {
- // Scan through all locks for this datasource
- final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>
dsRunning = running.get(task.getDataSource());
- if (dsRunning == null) {
- return ImmutableList.of();
- } else {
- return dsRunning.values().stream()
- .flatMap(map -> map.values().stream())
- .flatMap(Collection::stream)
- .filter(taskLockPosse ->
taskLockPosse.containsTask(task))
- .collect(Collectors.toList());
+ // Clean up upgrade segment entries associated with a REPLACE task
+ if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() ==
TaskLockType.REPLACE)) {
+ final int upgradeSegmentsDeleted =
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+ log.info(
+ "Deleted [%d] entries from upgradeSegments table for task[%s] with
REPLACE locks.",
+ upgradeSegmentsDeleted, task.getId()
+ );
+ }
+
+ // Clean up pending segments associated with an APPEND task
+ if (task instanceof PendingSegmentAllocatingTask) {
+ final String taskAllocatorId = ((PendingSegmentAllocatingTask)
task).getTaskAllocatorId();
+ if (activeAllocatorIdToTaskIds.containsKey(taskAllocatorId)) {
+ final Set<String> taskIdsForSameAllocator =
activeAllocatorIdToTaskIds.get(taskAllocatorId);
+ taskIdsForSameAllocator.remove(task.getId());
+
+ if (taskIdsForSameAllocator.isEmpty()) {
+ final int pendingSegmentsDeleted = metadataStorageCoordinator
+ .deletePendingSegmentsForTaskAllocatorId(task.getDataSource(),
taskAllocatorId);
+ log.info(
+ "Deleted [%d] entries from pendingSegments table for
taskAllocatorId[%s].",
+ pendingSegmentsDeleted, taskAllocatorId
+ );
+ }
+ activeAllocatorIdToTaskIds.remove(taskAllocatorId);
+ }
}
}
- finally {
- giant.unlock();
+ catch (Exception e) {
+ log.warn(e, "Failure cleaning up upgradeSegments or pendingSegments
tables.");
}
}
+ /**
+ * Finds all the lock posses for the given task.
+ */
+ @GuardedBy("giant")
+ private List<TaskLockPosse> findLockPossesForTask(final Task task)
+ {
+ // Scan through all locks for this datasource
+ final NavigableMap<DateTime, SortedMap<Interval, List<TaskLockPosse>>>
locksForDatasource
+ = running.get(task.getDataSource());
+ if (locksForDatasource == null) {
+ return Collections.emptyList();
+ }
+
+ return locksForDatasource.values().stream()
+ .flatMap(map -> map.values().stream())
+ .flatMap(Collection::stream)
+ .filter(taskLockPosse ->
taskLockPosse.containsTask(task))
+ .collect(Collectors.toList());
+ }
+
private List<TaskLockPosse> findLockPossesContainingInterval(final String
dataSource, final Interval interval)
{
giant.lock();
@@ -1342,19 +1316,7 @@ public class TaskLockbox
}
@VisibleForTesting
- List<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task,
Interval interval)
- {
- giant.lock();
- try {
- return getOnlyTaskLockPosseContainingInterval(task, interval,
Collections.emptySet());
- }
- finally {
- giant.unlock();
- }
- }
-
- @VisibleForTesting
- List<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task,
Interval interval, Set<Integer> partitionIds)
+ Optional<TaskLockPosse> getOnlyTaskLockPosseContainingInterval(Task task,
Interval interval)
{
giant.lock();
try {
@@ -1364,35 +1326,20 @@ public class TaskLockbox
.collect(Collectors.toList());
if (filteredPosses.isEmpty()) {
- throw new ISE("Cannot find locks for task[%s] and interval[%s]",
task.getId(), interval);
- } else if (filteredPosses.size() > 1) {
- if (filteredPosses.stream()
- .anyMatch(posse ->
posse.getTaskLock().getGranularity() == LockGranularity.TIME_CHUNK)) {
- throw new ISE(
- "There are multiple timeChunk lockPosses for task[%s] and
interval[%s]?",
- task.getId(),
- interval
- );
- } else {
- final Map<Integer, TaskLockPosse> partitionIdsOfLocks = new
HashMap<>();
- for (TaskLockPosse posse : filteredPosses) {
- final SegmentLock segmentLock = (SegmentLock) posse.getTaskLock();
- partitionIdsOfLocks.put(segmentLock.getPartitionId(), posse);
- }
-
- if
(partitionIds.stream().allMatch(partitionIdsOfLocks::containsKey)) {
- return
partitionIds.stream().map(partitionIdsOfLocks::get).collect(Collectors.toList());
- } else {
- throw new ISE(
- "Task[%s] doesn't have locks for interval[%s] partitions[%]",
- task.getId(),
- interval,
- partitionIds.stream().filter(pid ->
!partitionIdsOfLocks.containsKey(pid)).collect(Collectors.toList())
- );
- }
- }
+ throw new ISE("Cannot find any lock for task[%s] and interval[%s]",
task.getId(), interval);
+ } else if (filteredPosses.size() == 1) {
+ return Optional.of(filteredPosses.get(0));
+ } else if (
+ filteredPosses.stream().anyMatch(
+ posse -> posse.taskLock.getGranularity() ==
LockGranularity.TIME_CHUNK
+ )
+ ) {
+ throw new ISE(
+ "There are multiple timechunk lockPosses for task[%s] and
interval[%s]",
+ task.getId(), interval
+ );
} else {
- return filteredPosses;
+ return Optional.empty();
}
}
finally {
@@ -1635,17 +1582,13 @@ public class TaskLockbox
Preconditions.checkArgument(
taskLock.getGroupId().equals(task.getGroupId()),
"groupId[%s] of task[%s] is different from the existing
lockPosse's groupId[%s]",
- task.getGroupId(),
- task.getId(),
- taskLock.getGroupId()
+ task.getGroupId(), task.getId(), taskLock.getGroupId()
);
}
Preconditions.checkArgument(
taskLock.getNonNullPriority() == task.getPriority(),
"priority[%s] of task[%s] is different from the existing lockPosse's
priority[%s]",
- task.getPriority(),
- task.getId(),
- taskLock.getNonNullPriority()
+ task.getPriority(), task.getId(), taskLock.getNonNullPriority()
);
return taskIds.add(task.getId());
}
@@ -1724,12 +1667,6 @@ public class TaskLockbox
return false;
}
- void forEachTask(Consumer<String> action)
- {
- Preconditions.checkNotNull(action, "action");
- taskIds.forEach(action);
- }
-
@Override
public boolean equals(Object o)
{
@@ -1801,8 +1738,7 @@ public class TaskLockbox
/**
* Contains the task, request, lock and final result for a segment
allocation.
*/
- @VisibleForTesting
- static class SegmentAllocationHolder
+ private static class SegmentAllocationHolder
{
final AllocationHolderList list;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index e6b20dd073e..a02b5108767 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -73,7 +73,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -83,6 +82,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -91,9 +91,6 @@ public class TaskLockboxTest
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new
TestDerbyConnector.DerbyConnectorRule();
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
private ObjectMapper objectMapper;
private TaskStorage taskStorage;
private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
@@ -105,9 +102,6 @@ public class TaskLockboxTest
private final int MEDIUM_PRIORITY = 10;
private final int LOW_PRIORITY = 5;
- @Rule
- public final ExpectedException exception = ExpectedException.none();
-
@Before
public void setup()
{
@@ -186,14 +180,16 @@ public class TaskLockboxTest
}
@Test
- public void testLockAfterTaskComplete() throws InterruptedException
+ public void testLockAfterTaskComplete()
{
- Task task = NoopTask.create();
- exception.expect(ISE.class);
- exception.expectMessage("Unable to grant lock to inactive Task");
+ final Task task = NoopTask.create();
lockbox.add(task);
lockbox.remove(task);
- acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task,
Intervals.of("2015-01-01/2015-01-02"));
+ ISE exception = Assert.assertThrows(
+ ISE.class,
+ () -> acquireTimeChunkLock(TaskLockType.EXCLUSIVE, task,
Intervals.of("2015-01-01/2015-01-02"))
+ );
+ Assert.assertTrue(exception.getMessage().contains("Unable to grant lock to
inactive Task"));
}
@Test
@@ -311,12 +307,15 @@ public class TaskLockboxTest
@Test
public void testTryLockAfterTaskComplete()
{
- Task task = NoopTask.create();
- exception.expect(ISE.class);
- exception.expectMessage("Unable to grant lock to inactive Task");
+ final Task task = NoopTask.create();
lockbox.add(task);
lockbox.remove(task);
- Assert.assertFalse(tryTimeChunkLock(TaskLockType.EXCLUSIVE, task,
Intervals.of("2015-01-01/2015-01-02")).isOk());
+
+ ISE exception = Assert.assertThrows(
+ ISE.class,
+ () -> tryTimeChunkLock(TaskLockType.EXCLUSIVE, task,
Intervals.of("2015-01-01/2015-01-02"))
+ );
+ Assert.assertTrue(exception.getMessage().contains("Unable to grant lock to
inactive Task"));
}
@Test
@@ -759,23 +758,23 @@ public class TaskLockboxTest
).isOk()
);
- final List<TaskLockPosse> highLockPosses =
lockbox.getOnlyTaskLockPosseContainingInterval(
+ final Optional<TaskLockPosse> highLockPosse =
lockbox.getOnlyTaskLockPosseContainingInterval(
highPriorityTask,
Intervals.of("2018-12-16T09:00:00/2018-12-16T09:30:00")
);
- Assert.assertEquals(1, highLockPosses.size());
- Assert.assertTrue(highLockPosses.get(0).containsTask(highPriorityTask));
- Assert.assertFalse(highLockPosses.get(0).getTaskLock().isRevoked());
+ Assert.assertTrue(highLockPosse.isPresent());
+ Assert.assertTrue(highLockPosse.get().containsTask(highPriorityTask));
+ Assert.assertFalse(highLockPosse.get().getTaskLock().isRevoked());
- final List<TaskLockPosse> lowLockPosses =
lockbox.getOnlyTaskLockPosseContainingInterval(
+ final Optional<TaskLockPosse> lowLockPosse =
lockbox.getOnlyTaskLockPosseContainingInterval(
lowPriorityTask,
Intervals.of("2018-12-16T09:00:00/2018-12-16T10:00:00")
);
- Assert.assertEquals(1, lowLockPosses.size());
- Assert.assertTrue(lowLockPosses.get(0).containsTask(lowPriorityTask));
- Assert.assertTrue(lowLockPosses.get(0).getTaskLock().isRevoked());
+ Assert.assertTrue(lowLockPosse.isPresent());
+ Assert.assertTrue(lowLockPosse.get().containsTask(lowPriorityTask));
+ Assert.assertTrue(lowLockPosse.get().getTaskLock().isRevoked());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]