This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ef46d882007 Release unneeded append locks after acquiring a new
superseding append lock (#15682)
ef46d882007 is described below
commit ef46d8820077cf9e7fc436f5c7e3219b42eb3077
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Jan 30 16:51:56 2024 +0530
Release unneeded append locks after acquiring a new superseding append lock
(#15682)
* Fix segment transactional append when publishing with multiple
overlapping locks
---
.../druid/indexing/overlord/TaskLockbox.java | 212 ++++++++-------------
.../concurrent/ConcurrentReplaceAndAppendTest.java | 38 ++++
.../druid/indexing/overlord/TaskLockboxTest.java | 191 +++++++------------
3 files changed, 195 insertions(+), 246 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index da64198dd00..54e29191cff 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -175,8 +175,6 @@ public class TaskLockbox
savedTaskLockWithPriority
);
if (taskLockPosse != null) {
- taskLockPosse.addTask(task);
-
final TaskLock taskLock = taskLockPosse.getTaskLock();
if
(savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
@@ -299,7 +297,7 @@ public class TaskLockbox
throw new ISE("Unknown lockGranularity[%s]",
taskLock.getGranularity());
}
- return createOrFindLockPosse(request);
+ return createOrFindLockPosse(request, task, false);
}
catch (Exception e) {
log.error(e,
@@ -401,7 +399,7 @@ public class TaskLockbox
convertedRequest = request;
}
- final TaskLockPosse posseToUse = createOrFindLockPosse(convertedRequest);
+ final TaskLockPosse posseToUse = createOrFindLockPosse(convertedRequest,
task, true);
if (posseToUse != null && !posseToUse.getTaskLock().isRevoked()) {
if (request instanceof LockRequestForNewSegment) {
final LockRequestForNewSegment lockRequestForNewSegment =
(LockRequestForNewSegment) request;
@@ -416,36 +414,7 @@ public class TaskLockbox
newSegmentId = allocateSegmentId(lockRequestForNewSegment,
posseToUse.getTaskLock().getVersion());
}
}
-
- // Add to existing TaskLockPosse, if necessary
- if (posseToUse.addTask(task)) {
- log.info("Added task[%s] to TaskLock[%s]", task.getId(),
posseToUse.getTaskLock());
-
- // Update task storage facility. If it fails, revoke the lock.
- try {
- taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
- return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
- }
- catch (Exception e) {
- log.makeAlert("Failed to persist lock in storage")
- .addData("task", task.getId())
- .addData("dataSource", posseToUse.getTaskLock().getDataSource())
- .addData("interval", posseToUse.getTaskLock().getInterval())
- .addData("version", posseToUse.getTaskLock().getVersion())
- .emit();
- unlock(
- task,
- convertedRequest.getInterval(),
- posseToUse.getTaskLock().getGranularity() ==
LockGranularity.SEGMENT
- ? ((SegmentLock) posseToUse.taskLock).getPartitionId()
- : null
- );
- return LockResult.fail();
- }
- } else {
- log.info("Task[%s] already present in TaskLock[%s]", task.getId(),
posseToUse.getTaskLock().getGroupId());
- return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
- }
+ return LockResult.ok(posseToUse.getTaskLock(), newSegmentId);
} else {
final boolean lockRevoked = posseToUse != null &&
posseToUse.getTaskLock().isRevoked();
if (lockRevoked) {
@@ -499,11 +468,7 @@ public class TaskLockbox
allocateSegmentIds(dataSource, interval, skipSegmentLineageCheck,
holderList.getPending());
holderList.getPending().forEach(holder -> acquireTaskLock(holder,
false));
}
- holderList.getPending().forEach(holder -> addTaskAndPersistLocks(holder,
isTimeChunkLock));
- }
- catch (Exception e) {
- holderList.clearStaleLocks(this);
- throw e;
+ holderList.getPending().forEach(SegmentAllocationHolder::markSucceeded);
}
finally {
giant.unlock();
@@ -538,7 +503,7 @@ public class TaskLockbox
}
// Create or find the task lock for the created lock request
- final TaskLockPosse posseToUse = createOrFindLockPosse(lockRequest);
+ final TaskLockPosse posseToUse = createOrFindLockPosse(lockRequest,
holder.task, true);
final TaskLock acquiredLock = posseToUse == null ? null :
posseToUse.getTaskLock();
if (posseToUse == null) {
holder.markFailed("Could not find or create lock posse.");
@@ -549,60 +514,14 @@ public class TaskLockbox
}
}
- /**
- * Adds the task to the found lock posse if not already added and updates
- * in the metadata store. Marks the segment allocation as failed if the
update
- * did not succeed.
- */
- private void addTaskAndPersistLocks(SegmentAllocationHolder holder, boolean
isTimeChunkLock)
- {
- final Task task = holder.task;
- final TaskLock acquiredLock = holder.acquiredLock;
-
- if (holder.taskLockPosse.addTask(task)) {
- log.info("Added task [%s] to TaskLock [%s]", task.getId(), acquiredLock);
-
- // This can also be batched later
- boolean success = updateLockInStorage(task, acquiredLock);
- if (success) {
- holder.markSucceeded();
- } else {
- final Integer partitionId = isTimeChunkLock
- ? null : ((SegmentLock)
acquiredLock).getPartitionId();
- unlock(task, holder.lockRequestInterval, partitionId);
- holder.markFailed("Could not update task lock in metadata store.");
- }
- } else {
- log.info("Task [%s] already present in TaskLock [%s]", task.getId(),
acquiredLock.getGroupId());
- holder.markSucceeded();
- }
- }
-
- private boolean updateLockInStorage(Task task, TaskLock taskLock)
- {
- try {
- taskStorage.addLock(task.getId(), taskLock);
- return true;
- }
- catch (Exception e) {
- log.makeAlert("Failed to persist lock in storage")
- .addData("task", task.getId())
- .addData("dataSource", taskLock.getDataSource())
- .addData("interval", taskLock.getInterval())
- .addData("version", taskLock.getVersion())
- .emit();
-
- return false;
- }
- }
-
- private TaskLockPosse createOrFindLockPosse(LockRequest request)
+ private TaskLockPosse createOrFindLockPosse(LockRequest request, Task task,
boolean persist)
{
Preconditions.checkState(!(request instanceof LockRequestForNewSegment),
"Can't handle LockRequestForNewSegment");
giant.lock();
try {
+ final TaskLockPosse posseToUse;
final List<TaskLockPosse> foundPosses = findLockPossesOverlapsInterval(
request.getDataSource(),
request.getInterval()
@@ -613,7 +532,7 @@ public class TaskLockbox
.filter(taskLockPosse ->
taskLockPosse.getTaskLock().conflict(request))
.collect(Collectors.toList());
- if (conflictPosses.size() > 0) {
+ if (!conflictPosses.isEmpty()) {
// If we have some locks for dataSource and interval, check they can
be reused.
// If they can't be reused, check lock priority and revoke existing
locks if possible.
final List<TaskLockPosse> reusablePosses = foundPosses
@@ -621,7 +540,7 @@ public class TaskLockbox
.filter(posse -> posse.reusableFor(request))
.collect(Collectors.toList());
- if (reusablePosses.size() == 0) {
+ if (reusablePosses.isEmpty()) {
// case 1) this task doesn't have any lock, but others do
if ((request.getType().equals(TaskLockType.APPEND) ||
request.getType().equals(TaskLockType.REPLACE))
@@ -631,14 +550,10 @@ public class TaskLockbox
}
// First, check if the lock can coexist with its conflicting posses
- if (canLockCoexist(conflictPosses, request)) {
- return createNewTaskLockPosse(request);
- }
-
// If not, revoke all lower priority locks of different types if the
request has a greater priority
- if (revokeAllIncompatibleActiveLocksIfPossible(conflictPosses,
request)) {
- return createNewTaskLockPosse(request);
-
+ if (canLockCoexist(conflictPosses, request)
+ || revokeAllIncompatibleActiveLocksIfPossible(conflictPosses,
request)) {
+ posseToUse = createNewTaskLockPosse(request);
} else {
// During a rolling update, tasks of mixed versions can be run at
the same time. Old tasks would request
// timeChunkLocks while new tasks would ask segmentLocks. The
below check is to allow for old and new tasks
@@ -653,7 +568,7 @@ public class TaskLockbox
if (allDifferentGranularity) {
// Lock collision was because of the different granularity in
the same group.
// We can add a new taskLockPosse.
- return createNewTaskLockPosse(request);
+ posseToUse = createNewTaskLockPosse(request);
} else {
log.info(
"Cannot create a new taskLockPosse for request[%s] because
existing locks[%s] have same or higher priorities",
@@ -665,7 +580,7 @@ public class TaskLockbox
}
} else if (reusablePosses.size() == 1) {
// case 2) we found a lock posse for the given request
- return reusablePosses.get(0);
+ posseToUse = reusablePosses.get(0);
} else {
// case 3) we found multiple lock posses for the given task
throw new ISE(
@@ -677,8 +592,49 @@ public class TaskLockbox
} else {
// We don't have any locks for dataSource and interval.
// Let's make a new one.
- return createNewTaskLockPosse(request);
+ posseToUse = createNewTaskLockPosse(request);
+ }
+ if (posseToUse == null || posseToUse.getTaskLock() == null) {
+ return null;
+ }
+ // Add to existing TaskLockPosse
+ if (posseToUse.addTask(task)) {
+ log.info("Added task[%s] to TaskLock[%s]", task.getId(),
posseToUse.getTaskLock());
+
+ // If the task lock can be used instead of the conflicing posses,
their locks can be released
+ for (TaskLockPosse conflictPosse : conflictPosses) {
+ if (conflictPosse.containsTask(task) &&
posseToUse.supersedes(conflictPosse)) {
+ unlock(task, conflictPosse.getTaskLock().getInterval());
+ }
+ }
+
+ if (persist) {
+ // Update task storage facility. If it fails, unlock it
+ try {
+ taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
+ }
+ catch (Exception e) {
+ log.makeAlert("Failed to persist lock in storage")
+ .addData("task", task.getId())
+ .addData("dataSource", posseToUse.getTaskLock().getDataSource())
+ .addData("interval", posseToUse.getTaskLock().getInterval())
+ .addData("version", posseToUse.getTaskLock().getVersion())
+ .emit();
+ unlock(
+ task,
+ posseToUse.getTaskLock().getInterval(),
+ posseToUse.getTaskLock().getGranularity() ==
LockGranularity.SEGMENT
+ ? ((SegmentLock) posseToUse.taskLock).getPartitionId()
+ : null
+ );
+ return null;
+ }
+ }
+
+ } else {
+ log.info("Task[%s] already present in TaskLock[%s]", task.getId(),
posseToUse.getTaskLock().getGroupId());
}
+ return posseToUse;
}
finally {
giant.unlock();
@@ -1144,7 +1100,7 @@ public class TaskLockbox
dsRunning.remove(interval.getStart());
}
- if (running.get(dataSource).size() == 0) {
+ if (running.get(dataSource).isEmpty()) {
running.remove(dataSource);
}
@@ -1525,14 +1481,14 @@ public class TaskLockbox
}
switch (type) {
case EXCLUSIVE:
- if (posse.getTaskLock().getPriority() >= priority) {
+ if (posse.getTaskLock().getNonNullPriority() >= priority) {
return false;
}
possesToRevoke.add(posse);
break;
case SHARED:
if (!posse.getTaskLock().getType().equals(TaskLockType.SHARED)) {
- if (posse.getTaskLock().getPriority() >= priority) {
+ if (posse.getTaskLock().getNonNullPriority() >= priority) {
return false;
}
possesToRevoke.add(posse);
@@ -1541,7 +1497,7 @@ public class TaskLockbox
case REPLACE:
if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
&&
request.getInterval().contains(posse.getTaskLock().getInterval()))) {
- if (posse.getTaskLock().getPriority() >= priority) {
+ if (posse.getTaskLock().getNonNullPriority() >= priority) {
return false;
}
possesToRevoke.add(posse);
@@ -1551,7 +1507,7 @@ public class TaskLockbox
if (!(posse.getTaskLock().getType().equals(TaskLockType.APPEND)
|| (posse.getTaskLock().getType().equals(TaskLockType.REPLACE)
&&
posse.getTaskLock().getInterval().contains(request.getInterval())))) {
- if (posse.getTaskLock().getPriority() >= priority) {
+ if (posse.getTaskLock().getNonNullPriority() >= priority) {
return false;
}
possesToRevoke.add(posse);
@@ -1635,6 +1591,29 @@ public class TaskLockbox
return taskIds.isEmpty();
}
+ /**
+ * Checks if an APPEND time chunk lock can be reused for another append
time chunk lock that already exists
+ * and has an interval that strictly contains the other's interval
+ * We do not expect multiple locks to exist with the same interval as the
existing lock would be reused.
+ * A new append lock with a strictly encompassing interval can be created
when a concurrent replace
+ * with a coarser granularity commits its segments and the appending task
makes subsequent allocations
+ * @param other the conflicting lockPosse that already exists
+ * @return true if the task can be unlocked from the other posse after it
has been added to the newly created posse.
+ */
+ boolean supersedes(TaskLockPosse other)
+ {
+ final TaskLock otherLock = other.taskLock;
+ return !taskLock.isRevoked()
+ && taskLock.getGranularity() == LockGranularity.TIME_CHUNK
+ && taskLock.getGranularity() == otherLock.getGranularity()
+ && taskLock.getType() == TaskLockType.APPEND
+ && taskLock.getType() == otherLock.getType()
+ && taskLock.getVersion().compareTo(otherLock.getVersion()) >= 0
+ && !taskLock.getInterval().equals(otherLock.getInterval())
+ && taskLock.getInterval().contains(otherLock.getInterval())
+ && taskLock.getGroupId().equals(otherLock.getGroupId());
+ }
+
boolean reusableFor(LockRequest request)
{
if (taskLock.getType() == request.getType() && taskLock.getGranularity()
== request.getGranularity()) {
@@ -1737,29 +1716,6 @@ public class TaskLockbox
return pending;
}
- /**
- * When task locks are acquired in an attempt to allocate segments, * a
new lock posse might be created.
- * However, the posse is associated with the task only after all the
segment allocations have succeeded.
- * If there is an exception, unlock all such unassociated locks.
- */
- void clearStaleLocks(TaskLockbox taskLockbox)
- {
- all
- .stream()
- .filter(holder -> holder.acquiredLock != null
- && holder.taskLockPosse != null
- && !holder.taskLockPosse.containsTask(holder.task))
- .forEach(holder -> {
- holder.taskLockPosse.addTask(holder.task);
- taskLockbox.unlock(
- holder.task,
- holder.acquiredLock.getInterval(),
- holder.acquiredLock instanceof SegmentLock ? ((SegmentLock)
holder.acquiredLock).getPartitionId() : null
- );
- log.info("Cleared stale lock[%s] for task[%s]",
holder.acquiredLock, holder.task.getId());
- });
- }
-
List<SegmentAllocateResult> getResults()
{
return all.stream().map(holder ->
holder.result).collect(Collectors.toList());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
index 3fda953a454..91c7d6a7175 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/concurrent/ConcurrentReplaceAndAppendTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.common.task.concurrent;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import org.apache.druid.indexing.common.MultipleFileTaskReportFileWriter;
import org.apache.druid.indexing.common.TaskLock;
@@ -893,6 +894,43 @@ public class ConcurrentReplaceAndAppendTest extends
IngestionTestBase
verifyInputSegments(replaceTask, JAN_23, segment1, segment2, segment3);
}
+ @Test
+ public void testLockAllocateDayReplaceMonthAllocateAppend()
+ {
+ final SegmentIdWithShardSpec pendingSegmentV0
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(),
Granularities.DAY);
+
+ final String v1 = replaceTask.acquireReplaceLockOn(JAN_23).getVersion();
+
+ final DataSegment segmentV10 = createSegment(JAN_23, v1);
+ replaceTask.commitReplaceSegments(segmentV10);
+ verifyIntervalHasUsedSegments(JAN_23, segmentV10);
+
+ final SegmentIdWithShardSpec pendingSegmentV1
+ = appendTask.allocateSegmentForTimestamp(FIRST_OF_JAN_23.getStart(),
Granularities.DAY);
+ Assert.assertEquals(segmentV10.getVersion(),
pendingSegmentV1.getVersion());
+
+ final DataSegment segmentV00 = asSegment(pendingSegmentV0);
+ final DataSegment segmentV11 = asSegment(pendingSegmentV1);
+ Set<DataSegment> appendSegments =
appendTask.commitAppendSegments(segmentV00, segmentV11)
+ .getSegments();
+
+ Assert.assertEquals(3, appendSegments.size());
+ // Segment V11 is committed
+ Assert.assertTrue(appendSegments.remove(segmentV11));
+ // Segment V00 is also committed
+ Assert.assertTrue(appendSegments.remove(segmentV00));
+ // Segment V00 is upgraded to v1 with MONTH granularlity at the time of
commit as V12
+ final DataSegment segmentV12 = Iterables.getOnlyElement(appendSegments);
+ Assert.assertEquals(v1, segmentV12.getVersion());
+ Assert.assertEquals(JAN_23, segmentV12.getInterval());
+ Assert.assertEquals(segmentV00.getLoadSpec(), segmentV12.getLoadSpec());
+
+ verifyIntervalHasUsedSegments(JAN_23, segmentV00, segmentV10, segmentV11,
segmentV12);
+ verifyIntervalHasVisibleSegments(JAN_23, segmentV10, segmentV11,
segmentV12);
+ }
+
+
@Nullable
private DataSegment findSegmentWith(String version, Map<String, Object>
loadSpec, Set<DataSegment> segments)
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 17ced86cfa3..c4ee78ea6a8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -36,8 +36,6 @@ import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.TimeChunkLock;
-import org.apache.druid.indexing.common.actions.SegmentAllocateAction;
-import org.apache.druid.indexing.common.actions.SegmentAllocateRequest;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.indexing.common.config.TaskStorageConfig;
@@ -51,7 +49,6 @@ import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.metadata.DerbyMetadataStorageActionHandlerFactory;
@@ -152,7 +149,12 @@ public class TaskLockboxTest
private LockResult tryTimeChunkLock(TaskLockType lockType, Task task,
Interval interval)
{
- return lockbox.tryLock(task, new TimeChunkLockRequest(lockType, task,
interval, null));
+ return tryTimeChunkLock(lockType, task, interval, null);
+ }
+
+ private LockResult tryTimeChunkLock(TaskLockType lockType, Task task,
Interval interval, String version)
+ {
+ return lockbox.tryLock(task, new TimeChunkLockRequest(lockType, task,
interval, version));
}
@Test
@@ -1053,7 +1055,7 @@ public class TaskLockboxTest
Assert.assertEquals(lockRequest.getDataSource(),
segmentLock.getDataSource());
Assert.assertEquals(lockRequest.getInterval(), segmentLock.getInterval());
Assert.assertEquals(lockRequest.getPartialShardSpec().getShardSpecClass(),
segmentId.getShardSpec().getClass());
- Assert.assertEquals(lockRequest.getPriority(), lockRequest.getPriority());
+ Assert.assertEquals(lockRequest.getPriority(),
segmentLock.getPriority().intValue());
}
@Test
@@ -1809,114 +1811,89 @@ public class TaskLockboxTest
}
@Test
- public void testDoNotCleanUsedLockAfterSegmentAllocationFailure()
+ public void testUnlockSupersededLocks()
{
final Task task = NoopTask.create();
- final Interval theInterval = Intervals.of("2023/2024");
taskStorage.insert(task, TaskStatus.running(task.getId()));
+ lockbox.add(task);
+ final Task otherTask = NoopTask.create();
+ taskStorage.insert(otherTask, TaskStatus.running(otherTask.getId()));
+ lockbox.add(otherTask);
- final TaskLockbox testLockbox = new
SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
- testLockbox.add(task);
- final LockResult lockResult = testLockbox.tryLock(task, new
TimeChunkLockRequest(
- TaskLockType.SHARED,
- task,
- theInterval,
- null
- ));
- Assert.assertTrue(lockResult.isOk());
-
- SegmentAllocateRequest request = new SegmentAllocateRequest(
+ // Can coexist and is superseded. Will be unlocked
+ final TaskLock supersededLock = tryTimeChunkLock(
+ TaskLockType.APPEND,
task,
- new SegmentAllocateAction(
- task.getDataSource(),
- DateTimes.of("2023-01-01"),
- Granularities.NONE,
- Granularities.YEAR,
- task.getId(),
- null,
- false,
- null,
- null,
- TaskLockType.SHARED
- ),
- 90
+ Intervals.of("2024-01-01/2024-01-02"),
+ "v0"
+ ).getTaskLock();
+ Assert.assertEquals(
+ ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+ ImmutableSet.of(supersededLock)
);
- try {
- testLockbox.allocateSegments(
- ImmutableList.of(request),
- "DS",
- theInterval,
- false,
- LockGranularity.TIME_CHUNK
- );
- }
- catch (Exception e) {
- // do nothing
- }
- Assert.assertFalse(testLockbox.getAllLocks().isEmpty());
+ // Can coexist, but is not superseded as the task doesn't belong to this
posse
+ final TaskLock taskNotInPosse = tryTimeChunkLock(
+ TaskLockType.APPEND,
+ otherTask,
+ Intervals.of("2024-01-01/2024-01-02"),
+ "v0"
+ ).getTaskLock();
Assert.assertEquals(
- lockResult.getTaskLock(),
- testLockbox.getOnlyTaskLockPosseContainingInterval(task,
theInterval).get(0).getTaskLock()
+ ImmutableSet.copyOf(taskStorage.getLocks(otherTask.getId())),
+ ImmutableSet.of(taskNotInPosse)
);
- }
-
- @Test
- public void testCleanUpLocksAfterSegmentAllocationFailure()
- {
- final Task task = NoopTask.create();
- taskStorage.insert(task, TaskStatus.running(task.getId()));
- final TaskLockbox testLockbox = new
SegmentAllocationFailingTaskLockbox(taskStorage, metadataStorageCoordinator);
- testLockbox.add(task);
+ // Can coexist, but is not superseded as it is not an APPEND lock
+ final TaskLock replaceLock = tryTimeChunkLock(
+ TaskLockType.REPLACE,
+ task,
+ Intervals.of("2024-01-01/2025-01-01"),
+ "v0"
+ ).getTaskLock();
+ Assert.assertEquals(
+ ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+ ImmutableSet.of(supersededLock, replaceLock)
+ );
- SegmentAllocateRequest request0 = new SegmentAllocateRequest(
+ // Can coexist, but is not superseded due to higher version
+ final TaskLock higherVersion = tryTimeChunkLock(
+ TaskLockType.APPEND,
task,
- new SegmentAllocateAction(
- task.getDataSource(),
- DateTimes.of("2023-01-01"),
- Granularities.NONE,
- Granularities.YEAR,
- task.getId(),
- null,
- false,
- null,
- null,
- TaskLockType.SHARED
- ),
- 90
+ Intervals.of("2024-01-11/2024-01-12"),
+ "v1"
+ ).getTaskLock();
+ Assert.assertEquals(
+ ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+ ImmutableSet.of(supersededLock, replaceLock, higherVersion)
);
- SegmentAllocateRequest request1 = new SegmentAllocateRequest(
+ // Can coexist, but is not superseded as interval is not fully contained
+ final TaskLock uncontainedInterval = tryTimeChunkLock(
+ TaskLockType.APPEND,
task,
- new SegmentAllocateAction(
- task.getDataSource(),
- DateTimes.of("2023-01-01"),
- Granularities.NONE,
- Granularities.MONTH,
- task.getId(),
- null,
- false,
- null,
- null,
- TaskLockType.SHARED
- ),
- 90
+ Intervals.of("2024-01-28/2024-02-04"),
+ "v0"
+ ).getTaskLock();
+ Assert.assertEquals(
+ ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+ ImmutableSet.of(supersededLock, replaceLock, higherVersion,
uncontainedInterval)
);
- try {
- testLockbox.allocateSegments(
- ImmutableList.of(request0, request1),
- "DS",
- Intervals.of("2023/2024"),
- false,
- LockGranularity.TIME_CHUNK
- );
- }
- catch (Exception e) {
- // do nothing
- }
- Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
+ final TaskLock theLock = tryTimeChunkLock(
+ TaskLockType.APPEND,
+ task,
+ Intervals.of("2024-01-01/2024-02-01"),
+ "v0"
+ ).getTaskLock();
+ Assert.assertEquals(
+ ImmutableSet.copyOf(taskStorage.getLocks(task.getId())),
+ ImmutableSet.of(theLock, replaceLock, higherVersion,
uncontainedInterval)
+ );
+ Assert.assertEquals(
+ ImmutableSet.copyOf(taskStorage.getLocks(otherTask.getId())),
+ ImmutableSet.of(taskNotInPosse)
+ );
}
@Test
@@ -2175,26 +2152,4 @@ public class TaskLockboxTest
.contains("FailingLockAcquisition") ? null :
super.verifyAndCreateOrFindLockPosse(task, taskLock);
}
}
-
- private static class SegmentAllocationFailingTaskLockbox extends TaskLockbox
- {
- public SegmentAllocationFailingTaskLockbox(
- TaskStorage taskStorage,
- IndexerMetadataStorageCoordinator metadataStorageCoordinator
- )
- {
- super(taskStorage, metadataStorageCoordinator);
- }
-
- @Override
- void allocateSegmentIds(
- String dataSource,
- Interval interval,
- boolean skipSegmentLineageCheck,
- Collection<SegmentAllocationHolder> holders
- )
- {
- throw new RuntimeException("This lockbox cannot allocate segemnts.");
- }
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]