This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 54f29fedce Use PreparedBatch while deleting segments (#14639)
54f29fedce is described below

commit 54f29fedce29704f2b3acfb134a9d5735648b279
Author: Jason Koch <[email protected]>
AuthorDate: Sun Jul 23 10:25:04 2023 -0700

    Use PreparedBatch while deleting segments (#14639)
    
    Related to #14634
    
    Changes:
    - Update `IndexerSQLMetadataStorageCoordinator.deleteSegments` to use
    JDBI PreparedBatch instead of issuing single DELETE statements
---
 .../IndexerSQLMetadataStorageCoordinator.java      | 41 +++++++++--------
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 52 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 21 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 307bfb0508..107dc4b9f9 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -1773,32 +1773,31 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
   @Override
   public void deleteSegments(final Set<DataSegment> segments)
   {
-    connector.getDBI().inTransaction(
-        new TransactionCallback<Void>()
-        {
-          @Override
-          public Void inTransaction(Handle handle, TransactionStatus 
transactionStatus)
-          {
-            int segmentSize = segments.size();
-            String dataSource = "";
-            for (final DataSegment segment : segments) {
-              dataSource = segment.getDataSource();
-              deleteSegment(handle, segment);
-            }
-            log.debugSegments(segments, "Delete the metadata of segments");
-            log.info("Removed [%d] segments from metadata storage for 
dataSource [%s]!", segmentSize, dataSource);
+    if (segments.isEmpty()) {
+      log.info("No segments to delete.");
+      return;
+    }
 
-            return null;
+    final String deleteSql = StringUtils.format("DELETE from %s WHERE id = 
:id", dbTables.getSegmentsTable());
+    final String dataSource = 
segments.stream().findFirst().map(DataSegment::getDataSource).get();
+
+    // generate the IDs outside the transaction block
+    final List<String> ids = segments.stream().map(s -> 
s.getId().toString()).collect(Collectors.toList());
+
+    int numDeletedSegments = connector.getDBI().inTransaction((handle, 
transactionStatus) -> {
+          final PreparedBatch batch = handle.prepareBatch(deleteSql);
+
+          for (final String id : ids) {
+            batch.bind("id", id).add();
           }
+
+          int[] deletedRows = batch.execute();
+          return Arrays.stream(deletedRows).sum();
         }
     );
-  }
 
-  private void deleteSegment(final Handle handle, final DataSegment segment)
-  {
-    handle.createStatement(StringUtils.format("DELETE from %s WHERE id = :id", 
dbTables.getSegmentsTable()))
-          .bind("id", segment.getId().toString())
-          .execute();
+    log.debugSegments(segments, "Delete the metadata of segments");
+    log.info("Deleted [%d] segments from metadata storage for dataSource 
[%s].", numDeletedSegments, dataSource);
   }
 
   private void updatePayload(final Handle handle, final DataSegment segment) 
throws IOException
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index ed5423a974..74e06bfb66 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -146,6 +146,18 @@ public class IndexerSQLMetadataStorageCoordinatorTest
       100
   );
 
+  private final DataSegment defaultSegment2WithBiggerSize = new DataSegment(
+          "fooDataSource",
+          Intervals.of("2015-01-01T00Z/2015-01-02T00Z"),
+          "version",
+          ImmutableMap.of(),
+          ImmutableList.of("dim1"),
+          ImmutableList.of("m1"),
+          new LinearShardSpec(1),
+          9,
+          200
+  );
+
   private final DataSegment defaultSegment3 = new DataSegment(
       "fooDataSource",
       Intervals.of("2015-01-03T00Z/2015-01-04T00Z"),
@@ -1413,6 +1425,46 @@ public class IndexerSQLMetadataStorageCoordinatorTest
     );
   }
 
+  @Test
+  public void testUpdateSegmentsInMetaDataStorage() throws IOException
+  {
+    // Published segments to MetaDataStorage
+    coordinator.announceHistoricalSegments(SEGMENTS);
+
+    // check segments Published
+    Assert.assertEquals(
+            SEGMENTS,
+            ImmutableSet.copyOf(
+                    coordinator.retrieveUsedSegmentsForInterval(
+                            defaultSegment.getDataSource(),
+                            defaultSegment.getInterval(),
+                            Segments.ONLY_VISIBLE
+                    )
+            )
+    );
+
+    // update single metadata item
+    
coordinator.updateSegmentMetadata(Collections.singleton(defaultSegment2WithBiggerSize));
+
+    Collection<DataSegment> updated = 
coordinator.retrieveUsedSegmentsForInterval(
+            defaultSegment.getDataSource(),
+            defaultSegment.getInterval(),
+            Segments.ONLY_VISIBLE);
+
+    Assert.assertEquals(SEGMENTS.size(), updated.size());
+
+    DataSegment defaultAfterUpdate = updated.stream().filter(s -> 
s.equals(defaultSegment)).findFirst().get();
+    DataSegment default2AfterUpdate = updated.stream().filter(s -> 
s.equals(defaultSegment2)).findFirst().get();
+
+    Assert.assertNotNull(defaultAfterUpdate);
+    Assert.assertNotNull(default2AfterUpdate);
+
+    // check that default did not change
+    Assert.assertEquals(defaultSegment.getSize(), 
defaultAfterUpdate.getSize());
+    // but that default 2 did change
+    Assert.assertEquals(defaultSegment2WithBiggerSize.getSize(), 
default2AfterUpdate.getSize());
+  }
+
   @Test
   public void testSingleAdditionalNumberedShardWithNoCorePartitions() throws 
IOException
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to