This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a26defd64bb Clean up stale entries from upgradeSegments table (#15637)
a26defd64bb is described below
commit a26defd64bb487d273433fbd6b3b84b6013b2639
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Jan 17 20:49:52 2024 +0530
Clean up stale entries from upgradeSegments table (#15637)
* Clean up stale entries from upgradeSegments table
---
.../druid/indexing/overlord/TaskLockbox.java | 8 ++++++
.../druid/indexing/overlord/TaskLockboxTest.java | 30 +++++++++++++++++++++
.../TestIndexerMetadataStorageCoordinator.java | 6 +++++
.../IndexerMetadataStorageCoordinator.java | 7 +++++
.../IndexerSQLMetadataStorageCoordinator.java | 16 +++++++++++
.../IndexerSQLMetadataStorageCoordinatorTest.java | 31 ++++++++++++++++++++++
6 files changed, 98 insertions(+)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 54191adf05d..da64198dd00 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -1220,6 +1220,14 @@ public class TaskLockbox
try {
try {
log.info("Removing task[%s] from activeTasks", task.getId());
+ if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() ==
TaskLockType.REPLACE)) {
+ final int upgradeSegmentsDeleted =
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+ log.info(
+ "Deleted [%d] entries from upgradeSegments table for task[%s]
with REPLACE locks.",
+ upgradeSegmentsDeleted,
+ task.getId()
+ );
+ }
unlockAll(task);
}
finally {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 14597bd7270..17ced86cfa3 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -1919,6 +1919,36 @@ public class TaskLockboxTest
Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
}
+ @Test
+ public void testUpgradeSegmentsCleanupOnUnlock()
+ {
+ final Task replaceTask = NoopTask.create();
+ final Task appendTask = NoopTask.create();
+ final IndexerSQLMetadataStorageCoordinator coordinator
+ = EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class);
+ // Only the replaceTask should attempt a delete on the upgradeSegments
table
+
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
+ EasyMock.replay(coordinator);
+
+ final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
+
+ taskLockbox.add(replaceTask);
+ taskLockbox.tryLock(
+ replaceTask,
+ new TimeChunkLockRequest(TaskLockType.REPLACE, replaceTask,
Intervals.of("2024/2025"), "v0")
+ );
+
+ taskLockbox.add(appendTask);
+ taskLockbox.tryLock(
+ appendTask,
+ new TimeChunkLockRequest(TaskLockType.APPEND, appendTask,
Intervals.of("2024/2025"), "v0")
+ );
+
+ taskLockbox.remove(replaceTask);
+ taskLockbox.remove(appendTask);
+
+ EasyMock.verify(coordinator);
+ }
private class TaskLockboxValidator
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index bca79a559af..2fc80adceac 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -277,6 +277,12 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
return null;
}
+ @Override
+ public int deleteUpgradeSegmentsForTask(final String taskId)
+ {
+ throw new UnsupportedOperationException();
+ }
+
public Set<DataSegment> getPublished()
{
return ImmutableSet.copyOf(published);
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 2a032545d72..dd0f7d8c98a 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -443,4 +443,11 @@ public interface IndexerMetadataStorageCoordinator
* @return DataSegment used segment corresponding to given id
*/
DataSegment retrieveSegmentForId(String id, boolean includeUnused);
+
+ /**
+ * Delete entries from the upgrade segments table after the corresponding
replace task has ended
+ * @param taskId - id of the task with replace locks
+ * @return number of deleted entries from the metadata store
+ */
+ int deleteUpgradeSegmentsForTask(String taskId);
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 252386c56bd..69dca46ea1c 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2707,6 +2707,22 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
);
}
+ @Override
+ public int deleteUpgradeSegmentsForTask(final String taskId)
+ {
+ return connector.getDBI().inTransaction(
+ (handle, status) -> handle
+ .createStatement(
+ StringUtils.format(
+ "DELETE FROM %s WHERE task_id = :task_id",
+ dbTables.getUpgradeSegmentsTable()
+ )
+ )
+ .bind("task_id", taskId)
+ .execute()
+ );
+ }
+
private static class PendingSegmentsRecord
{
private final String sequenceName;
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 9132fb23630..4c3534feacd 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -981,6 +981,37 @@ public class IndexerSQLMetadataStorageCoordinatorTest
Assert.assertEquals(defaultSegment,
coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true));
}
+ @Test
+ public void testCleanUpgradeSegmentsTableForTask()
+ {
+ final String taskToClean = "taskToClean";
+ final ReplaceTaskLock replaceLockToClean = new ReplaceTaskLock(
+ taskToClean,
+ Intervals.of("2023-01-01/2023-02-01"),
+ "2023-03-01"
+ );
+ DataSegment segmentToClean0 = createSegment(
+ Intervals.of("2023-01-01/2023-02-01"),
+ "2023-02-01",
+ new NumberedShardSpec(0, 0)
+ );
+ DataSegment segmentToClean1 = createSegment(
+ Intervals.of("2023-01-01/2023-01-02"),
+ "2023-01-02",
+ new NumberedShardSpec(0, 0)
+ );
+ insertIntoUpgradeSegmentsTable(
+ ImmutableMap.of(segmentToClean0, replaceLockToClean, segmentToClean1,
replaceLockToClean)
+ );
+
+ // Unrelated task should not result in clean up
+ Assert.assertEquals(0,
coordinator.deleteUpgradeSegmentsForTask("someRandomTask"));
+ // The two segment entries are deleted
+ Assert.assertEquals(2,
coordinator.deleteUpgradeSegmentsForTask(taskToClean));
+ // Nothing further to delete
+ Assert.assertEquals(0,
coordinator.deleteUpgradeSegmentsForTask(taskToClean));
+ }
+
@Test
public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws
IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]