This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6891866c434 Process retrieval of parent and child segment ids in 
batches (#16734)
6891866c434 is described below

commit 6891866c4341a66074db3908204bcf673f30143f
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Jul 15 18:24:23 2024 +0530

    Process retrieval of parent and child segment ids in batches (#16734)
---
 .../IndexerSQLMetadataStorageCoordinator.java      | 99 +++++++++++-----------
 .../IndexerSQLMetadataStorageCoordinatorTest.java  | 93 ++++++++++++++++++++
 2 files changed, 144 insertions(+), 48 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 54f75ccb920..ecfad572e74 100644
--- 
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++ 
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2954,28 +2954,30 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       return Collections.emptyMap();
     }
 
-    final List<String> segmentIdList = ImmutableList.copyOf(segmentIds);
-    final String sql = StringUtils.format(
-        "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = 
:dataSource %s",
-        dbTables.getSegmentsTable(),
-        SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", 
segmentIdList)
-    );
     final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
-    connector.retryWithHandle(
-        handle -> {
-          Query<Map<String, Object>> query = handle.createQuery(sql)
-                                                   .bind("dataSource", 
dataSource);
-          
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", 
segmentIdList, query);
-          return query.map((index, r, ctx) -> {
-            final String id = r.getString(1);
-            final String upgradedFromSegmentId = r.getString(2);
-            if (upgradedFromSegmentId != null) {
-              upgradedFromSegmentIds.put(id, upgradedFromSegmentId);
-            }
-            return null;
-          }).list();
-        }
-    );
+    final List<List<String>> partitions = 
Lists.partition(ImmutableList.copyOf(segmentIds), 100);
+    for (List<String> partition : partitions) {
+      final String sql = StringUtils.format(
+          "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = 
:dataSource %s",
+          dbTables.getSegmentsTable(),
+          SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id", 
partition)
+      );
+      connector.retryWithHandle(
+          handle -> {
+            Query<Map<String, Object>> query = handle.createQuery(sql)
+                                                     .bind("dataSource", 
dataSource);
+            
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id", 
partition, query);
+            return query.map((index, r, ctx) -> {
+              final String id = r.getString(1);
+              final String upgradedFromSegmentId = r.getString(2);
+              if (upgradedFromSegmentId != null) {
+                upgradedFromSegmentIds.put(id, upgradedFromSegmentId);
+              }
+              return null;
+            }).list();
+          }
+      );
+    }
     return upgradedFromSegmentIds;
   }
 
@@ -2989,39 +2991,40 @@ public class IndexerSQLMetadataStorageCoordinator 
implements IndexerMetadataStor
       return Collections.emptyMap();
     }
 
-    final List<String> upgradedFromSegmentIdList = 
ImmutableList.copyOf(segmentIds);
-    final String sql = StringUtils.format(
-        "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = 
:dataSource %s",
-        dbTables.getSegmentsTable(),
-        SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn(
-            "upgraded_from_segment_id",
-            upgradedFromSegmentIdList
-        )
-    );
     final Map<String, Set<String>> upgradedToSegmentIds = new HashMap<>();
     retrieveSegmentsById(dataSource, segmentIds)
         .stream()
         .map(DataSegment::getId)
         .map(SegmentId::toString)
         .forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new 
HashSet<>()).add(id));
-    connector.retryWithHandle(
-        handle -> {
-          Query<Map<String, Object>> query = handle.createQuery(sql)
-                                                   .bind("dataSource", 
dataSource);
-          SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition(
-              "upgraded_from_segment_id",
-              upgradedFromSegmentIdList,
-              query
-          );
-          return query.map((index, r, ctx) -> {
-            final String upgradedToId = r.getString(1);
-            final String id = r.getString(2);
-            upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>())
-                                .add(upgradedToId);
-            return null;
-          }).list();
-        }
-    );
+
+    final List<List<String>> partitions = 
Lists.partition(ImmutableList.copyOf(segmentIds), 100);
+    for (List<String> partition : partitions) {
+      final String sql = StringUtils.format(
+          "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource = 
:dataSource %s",
+          dbTables.getSegmentsTable(),
+          
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("upgraded_from_segment_id",
 partition)
+      );
+
+      connector.retryWithHandle(
+          handle -> {
+            Query<Map<String, Object>> query = handle.createQuery(sql)
+                                                     .bind("dataSource", 
dataSource);
+            SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition(
+                "upgraded_from_segment_id",
+                partition,
+                query
+            );
+            return query.map((index, r, ctx) -> {
+              final String upgradedToId = r.getString(1);
+              final String id = r.getString(2);
+              upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>())
+                                  .add(upgradedToId);
+              return null;
+            }).list();
+          }
+      );
+    }
     return upgradedToSegmentIds;
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index f352d5e2609..6eccbccaa84 100644
--- 
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -3452,6 +3452,48 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
   }
 
+  @Test
+  public void testRetrieveUpgradedFromSegmentIdsInBatches()
+  {
+    final int size = 500;
+    final int batchSize = 100;
+
+    List<DataSegment> segments = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      segments.add(
+          new DataSegment(
+              "DS",
+              Intervals.ETERNITY,
+              "v " + (i % 5),
+              ImmutableMap.of("num", i / 5),
+              ImmutableList.of("dim"),
+              ImmutableList.of("agg"),
+              new NumberedShardSpec(i / 5, 0),
+              0,
+              100L
+          )
+      );
+    }
+    Map<String, String> expected = new HashMap<>();
+    for (int i = 0; i < batchSize; i++) {
+      for (int j = 1; j < 5; j++) {
+        expected.put(
+            segments.get(5 * i + j).getId().toString(),
+            segments.get(5 * i).getId().toString()
+        );
+      }
+    }
+    insertUsedSegments(ImmutableSet.copyOf(segments), expected);
+
+    Map<String, String> actual = coordinator.retrieveUpgradedFromSegmentIds(
+        "DS",
+        
segments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+    );
+
+    Assert.assertEquals(400, actual.size());
+    Assert.assertEquals(expected, actual);
+  }
+
   @Test
   public void testRetrieveUpgradedToSegmentIds()
   {
@@ -3478,6 +3520,57 @@ public class IndexerSQLMetadataStorageCoordinatorTest 
extends IndexerSqlMetadata
     );
   }
 
+  @Test
+  public void testRetrieveUpgradedToSegmentIdsInBatches()
+  {
+    final int size = 500;
+    final int batchSize = 100;
+
+    List<DataSegment> segments = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      segments.add(
+          new DataSegment(
+              "DS",
+              Intervals.ETERNITY,
+              "v " + (i % 5),
+              ImmutableMap.of("num", i / 5),
+              ImmutableList.of("dim"),
+              ImmutableList.of("agg"),
+              new NumberedShardSpec(i / 5, 0),
+              0,
+              100L
+          )
+      );
+    }
+
+    Map<String, Set<String>> expected = new HashMap<>();
+    for (DataSegment segment : segments) {
+      final String id = segment.getId().toString();
+      expected.put(id, new HashSet<>());
+      expected.get(id).add(id);
+    }
+    Map<String, String> upgradeMap = new HashMap<>();
+    for (int i = 0; i < batchSize; i++) {
+      for (int j = 1; j < 5; j++) {
+        upgradeMap.put(
+            segments.get(5 * i + j).getId().toString(),
+            segments.get(5 * i).getId().toString()
+        );
+        expected.get(segments.get(5 * i).getId().toString())
+                .add(segments.get(5 * i + j).getId().toString());
+      }
+    }
+    insertUsedSegments(ImmutableSet.copyOf(segments), upgradeMap);
+
+    Map<String, Set<String>> actual = coordinator.retrieveUpgradedToSegmentIds(
+        "DS",
+        
segments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+    );
+
+    Assert.assertEquals(500, actual.size());
+    Assert.assertEquals(expected, actual);
+  }
+
   private void insertUsedSegments(Set<DataSegment> segments, Map<String, 
String> upgradedFromSegmentIdMap)
   {
     final String table = 
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to