This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a26defd64bb Clean up stale entries from upgradeSegments table (#15637)
a26defd64bb is described below

commit a26defd64bb487d273433fbd6b3b84b6013b2639
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Wed Jan 17 20:49:52 2024 +0530

    Clean up stale entries from upgradeSegments table (#15637)
    
    * Clean up stale entries from upgradeSegments table
---
 .../druid/indexing/overlord/TaskLockbox.java       |  8 ++++++
 .../druid/indexing/overlord/TaskLockboxTest.java   | 30 +++++++++++++++++++++
 .../TestIndexerMetadataStorageCoordinator.java     |  6 +++++
 .../IndexerMetadataStorageCoordinator.java         |  7 +++++
 .../IndexerSQLMetadataStorageCoordinator.java      | 16 +++++++++++
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 31 ++++++++++++++++++++++
 6 files changed, 98 insertions(+)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 54191adf05d..da64198dd00 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -1220,6 +1220,14 @@ public class TaskLockbox
     try {
       try {
         log.info("Removing task[%s] from activeTasks", task.getId());
+        if (findLocksForTask(task).stream().anyMatch(lock -> lock.getType() == 
TaskLockType.REPLACE)) {
+          final int upgradeSegmentsDeleted = 
metadataStorageCoordinator.deleteUpgradeSegmentsForTask(task.getId());
+          log.info(
+              "Deleted [%d] entries from upgradeSegments table for task[%s] 
with REPLACE locks.",
+              upgradeSegmentsDeleted,
+              task.getId()
+          );
+        }
         unlockAll(task);
       }
       finally {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 14597bd7270..17ced86cfa3 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -1919,6 +1919,36 @@ public class TaskLockboxTest
     Assert.assertTrue(testLockbox.getAllLocks().isEmpty());
   }
 
+  @Test
+  public void testUpgradeSegmentsCleanupOnUnlock()
+  {
+    final Task replaceTask = NoopTask.create();
+    final Task appendTask = NoopTask.create();
+    final IndexerSQLMetadataStorageCoordinator coordinator
+        = EasyMock.createMock(IndexerSQLMetadataStorageCoordinator.class);
+    // Only the replaceTask should attempt a delete on the upgradeSegments 
table
+    
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
+    EasyMock.replay(coordinator);
+
+    final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
+
+    taskLockbox.add(replaceTask);
+    taskLockbox.tryLock(
+        replaceTask,
+        new TimeChunkLockRequest(TaskLockType.REPLACE, replaceTask, 
Intervals.of("2024/2025"), "v0")
+    );
+
+    taskLockbox.add(appendTask);
+    taskLockbox.tryLock(
+        appendTask,
+        new TimeChunkLockRequest(TaskLockType.APPEND, appendTask, 
Intervals.of("2024/2025"), "v0")
+    );
+
+    taskLockbox.remove(replaceTask);
+    taskLockbox.remove(appendTask);
+
+    EasyMock.verify(coordinator);
+  }
 
   private class TaskLockboxValidator
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index bca79a559af..2fc80adceac 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -277,6 +277,12 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
     return null;
   }
 
+  @Override
+  public int deleteUpgradeSegmentsForTask(final String taskId)
+  {
+    throw new UnsupportedOperationException();
+  }
+
   public Set<DataSegment> getPublished()
   {
     return ImmutableSet.copyOf(published);
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index 2a032545d72..dd0f7d8c98a 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -443,4 +443,11 @@ public interface IndexerMetadataStorageCoordinator
    * @return DataSegment used segment corresponding to given id
    */
   DataSegment retrieveSegmentForId(String id, boolean includeUnused);
+
+  /**
+   * Delete entries from the upgrade segments table after the corresponding 
replace task has ended
+   * @param taskId - id of the task with replace locks
+   * @return number of deleted entries from the metadata store
+   */
+  int deleteUpgradeSegmentsForTask(String taskId);
 }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 252386c56bd..69dca46ea1c 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2707,6 +2707,22 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
     );
   }
 
+  @Override
+  public int deleteUpgradeSegmentsForTask(final String taskId)
+  {
+    return connector.getDBI().inTransaction(
+        (handle, status) -> handle
+            .createStatement(
+                StringUtils.format(
+                    "DELETE FROM %s WHERE task_id = :task_id",
+                    dbTables.getUpgradeSegmentsTable()
+                )
+            )
+            .bind("task_id", taskId)
+            .execute()
+    );
+  }
+
   private static class PendingSegmentsRecord
   {
     private final String sequenceName;
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 9132fb23630..4c3534feacd 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -981,6 +981,37 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     Assert.assertEquals(defaultSegment, 
coordinator.retrieveSegmentForId(defaultSegment.getId().toString(), true));
   }
 
+  @Test
+  public void testCleanUpgradeSegmentsTableForTask()
+  {
+    final String taskToClean = "taskToClean";
+    final ReplaceTaskLock replaceLockToClean = new ReplaceTaskLock(
+        taskToClean,
+        Intervals.of("2023-01-01/2023-02-01"),
+        "2023-03-01"
+    );
+    DataSegment segmentToClean0 = createSegment(
+        Intervals.of("2023-01-01/2023-02-01"),
+        "2023-02-01",
+        new NumberedShardSpec(0, 0)
+    );
+    DataSegment segmentToClean1 = createSegment(
+        Intervals.of("2023-01-01/2023-01-02"),
+        "2023-01-02",
+        new NumberedShardSpec(0, 0)
+    );
+    insertIntoUpgradeSegmentsTable(
+        ImmutableMap.of(segmentToClean0, replaceLockToClean, segmentToClean1, 
replaceLockToClean)
+    );
+
+    // Unrelated task should not result in clean up
+    Assert.assertEquals(0, 
coordinator.deleteUpgradeSegmentsForTask("someRandomTask"));
+    // The two segment entries are deleted
+    Assert.assertEquals(2, 
coordinator.deleteUpgradeSegmentsForTask(taskToClean));
+    // Nothing further to delete
+    Assert.assertEquals(0, 
coordinator.deleteUpgradeSegmentsForTask(taskToClean));
+  }
+
   @Test
   public void testTransactionalAnnounceFailDbNotNullWantDifferent() throws 
IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to