This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6891866c434 Process retrieval of parent and child segment ids in
batches (#16734)
6891866c434 is described below
commit 6891866c4341a66074db3908204bcf673f30143f
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Jul 15 18:24:23 2024 +0530
Process retrieval of parent and child segment ids in batches (#16734)
---
.../IndexerSQLMetadataStorageCoordinator.java | 99 +++++++++++-----------
.../IndexerSQLMetadataStorageCoordinatorTest.java | 93 ++++++++++++++++++++
2 files changed, 144 insertions(+), 48 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index 54f75ccb920..ecfad572e74 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -2954,28 +2954,30 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return Collections.emptyMap();
}
- final List<String> segmentIdList = ImmutableList.copyOf(segmentIds);
- final String sql = StringUtils.format(
- "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource =
:dataSource %s",
- dbTables.getSegmentsTable(),
- SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id",
segmentIdList)
- );
final Map<String, String> upgradedFromSegmentIds = new HashMap<>();
- connector.retryWithHandle(
- handle -> {
- Query<Map<String, Object>> query = handle.createQuery(sql)
- .bind("dataSource",
dataSource);
-
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id",
segmentIdList, query);
- return query.map((index, r, ctx) -> {
- final String id = r.getString(1);
- final String upgradedFromSegmentId = r.getString(2);
- if (upgradedFromSegmentId != null) {
- upgradedFromSegmentIds.put(id, upgradedFromSegmentId);
- }
- return null;
- }).list();
- }
- );
+ final List<List<String>> partitions =
Lists.partition(ImmutableList.copyOf(segmentIds), 100);
+ for (List<String> partition : partitions) {
+ final String sql = StringUtils.format(
+ "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource =
:dataSource %s",
+ dbTables.getSegmentsTable(),
+ SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("id",
partition)
+ );
+ connector.retryWithHandle(
+ handle -> {
+ Query<Map<String, Object>> query = handle.createQuery(sql)
+ .bind("dataSource",
dataSource);
+
SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition("id",
partition, query);
+ return query.map((index, r, ctx) -> {
+ final String id = r.getString(1);
+ final String upgradedFromSegmentId = r.getString(2);
+ if (upgradedFromSegmentId != null) {
+ upgradedFromSegmentIds.put(id, upgradedFromSegmentId);
+ }
+ return null;
+ }).list();
+ }
+ );
+ }
return upgradedFromSegmentIds;
}
@@ -2989,39 +2991,40 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
return Collections.emptyMap();
}
- final List<String> upgradedFromSegmentIdList =
ImmutableList.copyOf(segmentIds);
- final String sql = StringUtils.format(
- "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource =
:dataSource %s",
- dbTables.getSegmentsTable(),
- SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn(
- "upgraded_from_segment_id",
- upgradedFromSegmentIdList
- )
- );
final Map<String, Set<String>> upgradedToSegmentIds = new HashMap<>();
retrieveSegmentsById(dataSource, segmentIds)
.stream()
.map(DataSegment::getId)
.map(SegmentId::toString)
.forEach(id -> upgradedToSegmentIds.computeIfAbsent(id, k -> new
HashSet<>()).add(id));
- connector.retryWithHandle(
- handle -> {
- Query<Map<String, Object>> query = handle.createQuery(sql)
- .bind("dataSource",
dataSource);
- SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition(
- "upgraded_from_segment_id",
- upgradedFromSegmentIdList,
- query
- );
- return query.map((index, r, ctx) -> {
- final String upgradedToId = r.getString(1);
- final String id = r.getString(2);
- upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>())
- .add(upgradedToId);
- return null;
- }).list();
- }
- );
+
+ final List<List<String>> partitions =
Lists.partition(ImmutableList.copyOf(segmentIds), 100);
+ for (List<String> partition : partitions) {
+ final String sql = StringUtils.format(
+ "SELECT id, upgraded_from_segment_id FROM %s WHERE dataSource =
:dataSource %s",
+ dbTables.getSegmentsTable(),
+
SqlSegmentsMetadataQuery.getParameterizedInConditionForColumn("upgraded_from_segment_id",
partition)
+ );
+
+ connector.retryWithHandle(
+ handle -> {
+ Query<Map<String, Object>> query = handle.createQuery(sql)
+ .bind("dataSource",
dataSource);
+ SqlSegmentsMetadataQuery.bindColumnValuesToQueryWithInCondition(
+ "upgraded_from_segment_id",
+ partition,
+ query
+ );
+ return query.map((index, r, ctx) -> {
+ final String upgradedToId = r.getString(1);
+ final String id = r.getString(2);
+ upgradedToSegmentIds.computeIfAbsent(id, k -> new HashSet<>())
+ .add(upgradedToId);
+ return null;
+ }).list();
+ }
+ );
+ }
return upgradedToSegmentIds;
}
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index f352d5e2609..6eccbccaa84 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -3452,6 +3452,48 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
}
+ @Test
+ public void testRetrieveUpgradedFromSegmentIdsInBatches()
+ {
+ final int size = 500;
+ final int batchSize = 100;
+
+ List<DataSegment> segments = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ segments.add(
+ new DataSegment(
+ "DS",
+ Intervals.ETERNITY,
+ "v " + (i % 5),
+ ImmutableMap.of("num", i / 5),
+ ImmutableList.of("dim"),
+ ImmutableList.of("agg"),
+ new NumberedShardSpec(i / 5, 0),
+ 0,
+ 100L
+ )
+ );
+ }
+ Map<String, String> expected = new HashMap<>();
+ for (int i = 0; i < batchSize; i++) {
+ for (int j = 1; j < 5; j++) {
+ expected.put(
+ segments.get(5 * i + j).getId().toString(),
+ segments.get(5 * i).getId().toString()
+ );
+ }
+ }
+ insertUsedSegments(ImmutableSet.copyOf(segments), expected);
+
+ Map<String, String> actual = coordinator.retrieveUpgradedFromSegmentIds(
+ "DS",
+
segments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+ );
+
+ Assert.assertEquals(400, actual.size());
+ Assert.assertEquals(expected, actual);
+ }
+
@Test
public void testRetrieveUpgradedToSegmentIds()
{
@@ -3478,6 +3520,57 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
}
+ @Test
+ public void testRetrieveUpgradedToSegmentIdsInBatches()
+ {
+ final int size = 500;
+ final int batchSize = 100;
+
+ List<DataSegment> segments = new ArrayList<>();
+ for (int i = 0; i < size; i++) {
+ segments.add(
+ new DataSegment(
+ "DS",
+ Intervals.ETERNITY,
+ "v " + (i % 5),
+ ImmutableMap.of("num", i / 5),
+ ImmutableList.of("dim"),
+ ImmutableList.of("agg"),
+ new NumberedShardSpec(i / 5, 0),
+ 0,
+ 100L
+ )
+ );
+ }
+
+ Map<String, Set<String>> expected = new HashMap<>();
+ for (DataSegment segment : segments) {
+ final String id = segment.getId().toString();
+ expected.put(id, new HashSet<>());
+ expected.get(id).add(id);
+ }
+ Map<String, String> upgradeMap = new HashMap<>();
+ for (int i = 0; i < batchSize; i++) {
+ for (int j = 1; j < 5; j++) {
+ upgradeMap.put(
+ segments.get(5 * i + j).getId().toString(),
+ segments.get(5 * i).getId().toString()
+ );
+ expected.get(segments.get(5 * i).getId().toString())
+ .add(segments.get(5 * i + j).getId().toString());
+ }
+ }
+ insertUsedSegments(ImmutableSet.copyOf(segments), upgradeMap);
+
+ Map<String, Set<String>> actual = coordinator.retrieveUpgradedToSegmentIds(
+ "DS",
+
segments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+ );
+
+ Assert.assertEquals(500, actual.size());
+ Assert.assertEquals(expected, actual);
+ }
+
private void insertUsedSegments(Set<DataSegment> segments, Map<String,
String> upgradedFromSegmentIdMap)
{
final String table =
derbyConnectorRule.metadataTablesConfigSupplier().get().getSegmentsTable();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]