This is an automated email from the ASF dual-hosted git repository.

amatya pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/30.0.0 by this push:
     new b4514e22197 Add new index on datasource and task_allocator_id for 
pending segments (#16355) (#16357)
b4514e22197 is described below

commit b4514e22197ed3c9b0df6fb950564d20a610547e
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Apr 30 18:49:50 2024 +0530

    Add new index on datasource and task_allocator_id for pending segments 
(#16355) (#16357)
    
    * Add pending segments index on datasource and task_allocator_id
    
    * Use both datasource and task_allocator_id in queries
---
 .../apache/druid/indexing/overlord/TaskLockbox.java |  5 ++++-
 .../druid/indexing/overlord/TaskLockboxTest.java    |  6 ++++--
 .../test/TestIndexerMetadataStorageCoordinator.java |  2 +-
 .../overlord/IndexerMetadataStorageCoordinator.java |  3 ++-
 .../IndexerSQLMetadataStorageCoordinator.java       |  7 ++++---
 .../apache/druid/metadata/SQLMetadataConnector.java | 21 +++++++++++++++++++--
 6 files changed, 34 insertions(+), 10 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 5d71940d470..1a1369ba1e5 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -1242,7 +1242,10 @@ public class TaskLockbox
               idsInSameGroup.remove(task.getId());
               if (idsInSameGroup.isEmpty()) {
                 final int pendingSegmentsDeleted
-                    = 
metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId);
+                    = 
metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(
+                        task.getDataSource(),
+                        taskAllocatorId
+                );
                 log.info(
                     "Deleted [%d] entries from pendingSegments table for 
pending segments group [%s] with APPEND locks.",
                     pendingSegmentsDeleted, taskAllocatorId
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 3af74235e82..3c95709a9fc 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -1948,8 +1948,10 @@ public class TaskLockboxTest
     // Only the replaceTask should attempt a delete on the upgradeSegments 
table
     
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
     // Any task may attempt pending segment clean up
-    
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once();
-    
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once();
+    
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getDataSource(),
 replaceTask.getId()))
+            .andReturn(0).once();
+    
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getDataSource(),
 appendTask.getId()))
+            .andReturn(0).once();
     EasyMock.replay(coordinator);
 
     final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 6c4f556133e..3aceae494c6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -297,7 +297,7 @@ public class TestIndexerMetadataStorageCoordinator 
implements IndexerMetadataSto
   }
 
   @Override
-  public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup)
+  public int deletePendingSegmentsForTaskAllocatorId(final String datasource, 
final String taskAllocatorId)
   {
     throw new UnsupportedOperationException();
   }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index aea2674f6b8..da6dd9ffd95 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -492,10 +492,11 @@ public interface IndexerMetadataStorageCoordinator
 
   /**
    * Delete pending segment for a give task group after all the tasks 
belonging to it have completed.
+   * @param datasource datasource of the task
    * @param taskAllocatorId task id / task group / replica group for an 
appending task
    * @return number of pending segments deleted from the metadata store
    */
-  int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId);
+  int deletePendingSegmentsForTaskAllocatorId(String datasource, String 
taskAllocatorId);
 
   /**
    * Fetches all the pending segments of the datasource that overlap with a 
given interval.
diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index c5a36656c9a..618173a5db7 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2813,17 +2813,18 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   }
 
   @Override
-  public int deletePendingSegmentsForTaskAllocatorId(final String 
pendingSegmentsGroup)
+  public int deletePendingSegmentsForTaskAllocatorId(final String datasource, 
final String taskAllocatorId)
   {
     return connector.getDBI().inTransaction(
         (handle, status) -> handle
             .createStatement(
                 StringUtils.format(
-                    "DELETE FROM %s WHERE task_allocator_id = 
:task_allocator_id",
+                    "DELETE FROM %s WHERE dataSource = :dataSource AND 
task_allocator_id = :task_allocator_id",
                     dbTables.getPendingSegmentsTable()
                 )
             )
-            .bind("task_allocator_id", pendingSegmentsGroup)
+            .bind("dataSource", datasource)
+            .bind("task_allocator_id", taskAllocatorId)
             .execute()
     );
   }
diff --git 
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java 
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 94452de00ee..cb85548d5ba 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -294,7 +294,7 @@ public abstract class SQLMetadataConnector implements 
MetadataStorageConnector
             )
         )
     );
-    alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
+    alterPendingSegmentsTable(tableName);
   }
 
   public void createDataSourceTable(final String tableName)
@@ -481,7 +481,16 @@ public abstract class SQLMetadataConnector implements 
MetadataStorageConnector
     }
   }
 
-  private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String 
tableName)
+  /**
+   * Adds the following columns to the pending segments table to clean up 
unused records,
+   * and to faciliatate concurrent append and replace.
+   * 1) task_allocator_id -> The task id / task group id / task replica group 
id of the task that allocated it.
+   * 2) upgraded_from_segment_id -> The id of the segment from which the entry 
was upgraded upon concurrent replace.
+   *
+   * Also, adds an index on (dataSource, task_allocator_id)
+   * @param tableName name of the pending segments table
+   */
+  private void alterPendingSegmentsTable(final String tableName)
   {
     List<String> statements = new ArrayList<>();
     if (tableHasColumn(tableName, "upgraded_from_segment_id")) {
@@ -499,6 +508,14 @@ public abstract class SQLMetadataConnector implements 
MetadataStorageConnector
     if (!statements.isEmpty()) {
       alterTable(tableName, statements);
     }
+
+    final Set<String> createdIndexSet = getIndexOnTable(tableName);
+    createIndex(
+        tableName,
+        StringUtils.format("idx_%1$s_datasource_task_allocator_id", tableName),
+        ImmutableList.of("dataSource", "task_allocator_id"),
+        createdIndexSet
+    );
   }
 
   public void createLogTable(final String tableName, final String 
entryTypeName)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to