This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 54f29fedce Use PreparedBatch while deleting segments (#14639)
54f29fedce is described below
commit 54f29fedce29704f2b3acfb134a9d5735648b279
Author: Jason Koch <[email protected]>
AuthorDate: Sun Jul 23 10:25:04 2023 -0700
Use PreparedBatch while deleting segments (#14639)
Related to #14634
Changes:
- Update `IndexerSQLMetadataStorageCoordinator.deleteSegments` to use
JDBI PreparedBatch instead of issuing single DELETE statements
---
.../IndexerSQLMetadataStorageCoordinator.java | 41 +++++++++--------
.../IndexerSQLMetadataStorageCoordinatorTest.java | 52 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 21 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 307bfb0508..107dc4b9f9 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1773,32 +1773,31 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
@Override
public void deleteSegments(final Set<DataSegment> segments)
{
- connector.getDBI().inTransaction(
- new TransactionCallback<Void>()
- {
- @Override
- public Void inTransaction(Handle handle, TransactionStatus
transactionStatus)
- {
- int segmentSize = segments.size();
- String dataSource = "";
- for (final DataSegment segment : segments) {
- dataSource = segment.getDataSource();
- deleteSegment(handle, segment);
- }
- log.debugSegments(segments, "Delete the metadata of segments");
- log.info("Removed [%d] segments from metadata storage for
dataSource [%s]!", segmentSize, dataSource);
+ if (segments.isEmpty()) {
+ log.info("No segments to delete.");
+ return;
+ }
- return null;
+ final String deleteSql = StringUtils.format("DELETE from %s WHERE id =
:id", dbTables.getSegmentsTable());
+ final String dataSource =
segments.stream().findFirst().map(DataSegment::getDataSource).get();
+
+ // generate the IDs outside the transaction block
+ final List<String> ids = segments.stream().map(s ->
s.getId().toString()).collect(Collectors.toList());
+
+ int numDeletedSegments = connector.getDBI().inTransaction((handle,
transactionStatus) -> {
+ final PreparedBatch batch = handle.prepareBatch(deleteSql);
+
+ for (final String id : ids) {
+ batch.bind("id", id).add();
}
+
+ int[] deletedRows = batch.execute();
+ return Arrays.stream(deletedRows).sum();
}
);
- }
- private void deleteSegment(final Handle handle, final DataSegment segment)
- {
- handle.createStatement(StringUtils.format("DELETE from %s WHERE id = :id",
dbTables.getSegmentsTable()))
- .bind("id", segment.getId().toString())
- .execute();
+ log.debugSegments(segments, "Delete the metadata of segments");
+ log.info("Deleted [%d] segments from metadata storage for dataSource
[%s].", numDeletedSegments, dataSource);
}
private void updatePayload(final Handle handle, final DataSegment segment)
throws IOException
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index ed5423a974..74e06bfb66 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -146,6 +146,18 @@ public class IndexerSQLMetadataStorageCoordinatorTest
100
);
+ private final DataSegment defaultSegment2WithBiggerSize = new DataSegment(
+ "fooDataSource",
+ Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
+ "version",
+ ImmutableMap.of(),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new LinearShardSpec(1),
+ 9,
+ 200
+ );
+
private final DataSegment defaultSegment3 = new DataSegment(
"fooDataSource",
Intervals.of("2015-01-03T00Z/2015-01-04T00Z"),
@@ -1413,6 +1425,46 @@ public class IndexerSQLMetadataStorageCoordinatorTest
);
}
+ @Test
+ public void testUpdateSegmentsInMetaDataStorage() throws IOException
+ {
+ // Published segments to MetaDataStorage
+ coordinator.announceHistoricalSegments(SEGMENTS);
+
+ // check segments Published
+ Assert.assertEquals(
+ SEGMENTS,
+ ImmutableSet.copyOf(
+ coordinator.retrieveUsedSegmentsForInterval(
+ defaultSegment.getDataSource(),
+ defaultSegment.getInterval(),
+ Segments.ONLY_VISIBLE
+ )
+ )
+ );
+
+ // update single metadata item
+
coordinator.updateSegmentMetadata(Collections.singleton(defaultSegment2WithBiggerSize));
+
+ Collection<DataSegment> updated =
coordinator.retrieveUsedSegmentsForInterval(
+ defaultSegment.getDataSource(),
+ defaultSegment.getInterval(),
+ Segments.ONLY_VISIBLE);
+
+ Assert.assertEquals(SEGMENTS.size(), updated.size());
+
+ DataSegment defaultAfterUpdate = updated.stream().filter(s ->
s.equals(defaultSegment)).findFirst().get();
+ DataSegment default2AfterUpdate = updated.stream().filter(s ->
s.equals(defaultSegment2)).findFirst().get();
+
+ Assert.assertNotNull(defaultAfterUpdate);
+ Assert.assertNotNull(default2AfterUpdate);
+
+ // check that default did not change
+ Assert.assertEquals(defaultSegment.getSize(),
defaultAfterUpdate.getSize());
+ // but that default 2 did change
+ Assert.assertEquals(defaultSegment2WithBiggerSize.getSize(),
default2AfterUpdate.getSize());
+ }
+
@Test
public void testSingleAdditionalNumberedShardWithNoCorePartitions() throws
IOException
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]