This is an automated email from the ASF dual-hosted git repository.
amatya pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/30.0.0 by this push:
new b4514e22197 Add new index on datasource and task_allocator_id for
pending segments (#16355) (#16357)
b4514e22197 is described below
commit b4514e22197ed3c9b0df6fb950564d20a610547e
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Tue Apr 30 18:49:50 2024 +0530
Add new index on datasource and task_allocator_id for pending segments
(#16355) (#16357)
* Add pending segments index on datasource and task_allocator_id
* Use both datasource and task_allocator_id in queries
---
.../apache/druid/indexing/overlord/TaskLockbox.java | 5 ++++-
.../druid/indexing/overlord/TaskLockboxTest.java | 6 ++++--
.../test/TestIndexerMetadataStorageCoordinator.java | 2 +-
.../overlord/IndexerMetadataStorageCoordinator.java | 3 ++-
.../IndexerSQLMetadataStorageCoordinator.java | 7 ++++---
.../apache/druid/metadata/SQLMetadataConnector.java | 21 +++++++++++++++++++--
6 files changed, 34 insertions(+), 10 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
index 5d71940d470..1a1369ba1e5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskLockbox.java
@@ -1242,7 +1242,10 @@ public class TaskLockbox
idsInSameGroup.remove(task.getId());
if (idsInSameGroup.isEmpty()) {
final int pendingSegmentsDeleted
- =
metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(taskAllocatorId);
+ =
metadataStorageCoordinator.deletePendingSegmentsForTaskAllocatorId(
+ task.getDataSource(),
+ taskAllocatorId
+ );
log.info(
"Deleted [%d] entries from pendingSegments table for
pending segments group [%s] with APPEND locks.",
pendingSegmentsDeleted, taskAllocatorId
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
index 3af74235e82..3c95709a9fc 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockboxTest.java
@@ -1948,8 +1948,10 @@ public class TaskLockboxTest
// Only the replaceTask should attempt a delete on the upgradeSegments
table
EasyMock.expect(coordinator.deleteUpgradeSegmentsForTask(replaceTask.getId())).andReturn(0).once();
// Any task may attempt pending segment clean up
-
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getId())).andReturn(0).once();
-
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getId())).andReturn(0).once();
+
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(replaceTask.getDataSource(),
replaceTask.getId()))
+ .andReturn(0).once();
+
EasyMock.expect(coordinator.deletePendingSegmentsForTaskAllocatorId(appendTask.getDataSource(),
appendTask.getId()))
+ .andReturn(0).once();
EasyMock.replay(coordinator);
final TaskLockbox taskLockbox = new TaskLockbox(taskStorage, coordinator);
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
index 6c4f556133e..3aceae494c6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java
@@ -297,7 +297,7 @@ public class TestIndexerMetadataStorageCoordinator
implements IndexerMetadataSto
}
@Override
- public int deletePendingSegmentsForTaskAllocatorId(final String taskGroup)
+ public int deletePendingSegmentsForTaskAllocatorId(final String datasource,
final String taskAllocatorId)
{
throw new UnsupportedOperationException();
}
diff --git
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
index aea2674f6b8..da6dd9ffd95 100644
---
a/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java
@@ -492,10 +492,11 @@ public interface IndexerMetadataStorageCoordinator
/**
* Delete pending segment for a give task group after all the tasks
belonging to it have completed.
+ * @param datasource datasource of the task
* @param taskAllocatorId task id / task group / replica group for an
appending task
* @return number of pending segments deleted from the metadata store
*/
- int deletePendingSegmentsForTaskAllocatorId(String taskAllocatorId);
+ int deletePendingSegmentsForTaskAllocatorId(String datasource, String
taskAllocatorId);
/**
* Fetches all the pending segments of the datasource that overlap with a
given interval.
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index c5a36656c9a..618173a5db7 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2813,17 +2813,18 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
@Override
- public int deletePendingSegmentsForTaskAllocatorId(final String
pendingSegmentsGroup)
+ public int deletePendingSegmentsForTaskAllocatorId(final String datasource,
final String taskAllocatorId)
{
return connector.getDBI().inTransaction(
(handle, status) -> handle
.createStatement(
StringUtils.format(
- "DELETE FROM %s WHERE task_allocator_id =
:task_allocator_id",
+ "DELETE FROM %s WHERE dataSource = :dataSource AND
task_allocator_id = :task_allocator_id",
dbTables.getPendingSegmentsTable()
)
)
- .bind("task_allocator_id", pendingSegmentsGroup)
+ .bind("dataSource", datasource)
+ .bind("task_allocator_id", taskAllocatorId)
.execute()
);
}
diff --git
a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
index 94452de00ee..cb85548d5ba 100644
--- a/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
+++ b/server/src/main/java/org/apache/druid/metadata/SQLMetadataConnector.java
@@ -294,7 +294,7 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
)
)
);
- alterPendingSegmentsTableAddParentIdAndTaskGroup(tableName);
+ alterPendingSegmentsTable(tableName);
}
public void createDataSourceTable(final String tableName)
@@ -481,7 +481,16 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
}
}
- private void alterPendingSegmentsTableAddParentIdAndTaskGroup(final String
tableName)
+ /**
+ * Adds the following columns to the pending segments table to clean up
unused records,
+ * and to faciliatate concurrent append and replace.
+ * 1) task_allocator_id -> The task id / task group id / task replica group
id of the task that allocated it.
+ * 2) upgraded_from_segment_id -> The id of the segment from which the entry
was upgraded upon concurrent replace.
+ *
+ * Also, adds an index on (dataSource, task_allocator_id)
+ * @param tableName name of the pending segments table
+ */
+ private void alterPendingSegmentsTable(final String tableName)
{
List<String> statements = new ArrayList<>();
if (tableHasColumn(tableName, "upgraded_from_segment_id")) {
@@ -499,6 +508,14 @@ public abstract class SQLMetadataConnector implements
MetadataStorageConnector
if (!statements.isEmpty()) {
alterTable(tableName, statements);
}
+
+ final Set<String> createdIndexSet = getIndexOnTable(tableName);
+ createIndex(
+ tableName,
+ StringUtils.format("idx_%1$s_datasource_task_allocator_id", tableName),
+ ImmutableList.of("dataSource", "task_allocator_id"),
+ createdIndexSet
+ );
}
public void createLogTable(final String tableName, final String
entryTypeName)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]