This is an automated email from the ASF dual-hosted git repository.
abhishekrb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a70e28a3c28 Parameterize segment IDs (#16174)
a70e28a3c28 is described below
commit a70e28a3c2836c68d9b18f262101ce7290463ddc
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Fri Mar 22 08:20:59 2024 -0700
Parameterize segment IDs (#16174)
* Add parameterized segment IDs.
* Refactor into one common method.
* Refactor getConditionForIntervalsAndMatchMode - pass in only what's
needed.
* Minor cleanup.
---
.../IndexerSQLMetadataStorageCoordinator.java | 13 +--
.../druid/metadata/SqlSegmentsMetadataQuery.java | 96 +++++++++++++---------
2 files changed, 63 insertions(+), 46 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index e63dc9c17e0..e2addccbcb9 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -185,11 +185,12 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.allMatch(Intervals::canCompareEndpointsAsStrings);
final SqlSegmentsMetadataQuery.IntervalMode intervalMode =
SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS;
- SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode(
- queryBuilder,
- compareIntervalEndpointsAsString ? intervals : Collections.emptyList(),
- intervalMode,
- connector
+ queryBuilder.append(
+ SqlSegmentsMetadataQuery.getConditionForIntervalsAndMatchMode(
+ compareIntervalEndpointsAsString ? intervals :
Collections.emptyList(),
+ intervalMode,
+ connector.getQuoteString()
+ )
);
final String queryString = StringUtils.format(queryBuilder.toString(),
dbTables.getSegmentsTable());
@@ -200,7 +201,7 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
.bind("dataSource", dataSource);
if (compareIntervalEndpointsAsString) {
- SqlSegmentsMetadataQuery.bindQueryIntervals(query, intervals);
+ SqlSegmentsMetadataQuery.bindIntervalsToQuery(query, intervals);
}
final List<Pair<DataSegment, String>> segmentsWithCreatedDates =
query
diff --git
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
index 5308f9dd7fe..4d9921d2b06 100644
---
a/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
+++
b/server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataQuery.java
@@ -256,20 +256,20 @@ public class SqlSegmentsMetadataQuery
private List<DataSegmentPlus> retrieveSegmentBatchById(String datasource,
List<String> segmentIds)
{
- if (segmentIds.isEmpty()) {
+ if (CollectionUtils.isNullOrEmpty(segmentIds)) {
return Collections.emptyList();
}
- final String segmentIdCsv = segmentIds.stream()
- .map(id -> "'" + id + "'")
- .collect(Collectors.joining(","));
- ResultIterator<DataSegmentPlus> resultIterator = handle
- .createQuery(
- StringUtils.format(
- "SELECT payload, used FROM %s WHERE dataSource = :dataSource
AND id IN (%s)",
- dbTables.getSegmentsTable(), segmentIdCsv
- )
+ final Query<Map<String, Object>> query = handle.createQuery(
+ StringUtils.format(
+ "SELECT payload, used FROM %s WHERE dataSource = :dataSource %s",
+ dbTables.getSegmentsTable(),
getParameterizedInConditionForColumn("id", segmentIds)
)
+ );
+
+ bindColumnValuesToQueryWithInCondition("id", segmentIds, query);
+
+ ResultIterator<DataSegmentPlus> resultIterator = query
.bind("dataSource", datasource)
.setFetchSize(connector.getStreamingFetchSize())
.map(
@@ -357,7 +357,7 @@ public class SqlSegmentsMetadataQuery
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);
if (hasVersions) {
- sb.append(getConditionForVersions(versions));
+ sb.append(getParameterizedInConditionForColumn("version", versions));
}
final Update stmt = handle
@@ -367,7 +367,7 @@ public class SqlSegmentsMetadataQuery
.bind("used_status_last_updated", DateTimes.nowUtc().toString());
if (hasVersions) {
- bindVersionsToQuery(stmt, versions);
+ bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}
return stmt.execute();
@@ -389,7 +389,7 @@ public class SqlSegmentsMetadataQuery
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);
if (hasVersions) {
- sb.append(getConditionForVersions(versions));
+ sb.append(getParameterizedInConditionForColumn("version", versions));
}
final Update stmt = handle
@@ -401,7 +401,7 @@ public class SqlSegmentsMetadataQuery
.bind("used_status_last_updated", DateTimes.nowUtc().toString());
if (hasVersions) {
- bindVersionsToQuery(stmt, versions);
+ bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}
return stmt.execute();
} else {
@@ -471,28 +471,28 @@ public class SqlSegmentsMetadataQuery
}
/**
- * Append the condition for the interval and match mode to the given string
builder with a partial query
- * @param sb - StringBuilder containing the paritial query with SELECT
clause and WHERE condition for used, datasource
+ * Get the condition for the interval and match mode.
* @param intervals - intervals to fetch the segments for
* @param matchMode - Interval match mode - overlaps or contains
- * @param connector - SQL connector
+ * @param quoteString - the connector-specific quote string
*/
- public static void appendConditionForIntervalsAndMatchMode(
- final StringBuilder sb,
+ public static String getConditionForIntervalsAndMatchMode(
final Collection<Interval> intervals,
final IntervalMode matchMode,
- final SQLMetadataConnector connector
+ final String quoteString
)
{
if (intervals.isEmpty()) {
- return;
+ return "";
}
+ final StringBuilder sb = new StringBuilder();
+
sb.append(" AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
matchMode.makeSqlCondition(
- connector.getQuoteString(),
+ quoteString,
StringUtils.format(":start%d", i),
StringUtils.format(":end%d", i)
)
@@ -525,14 +525,14 @@ public class SqlSegmentsMetadataQuery
));
}
sb.append(")");
+ return sb.toString();
}
/**
- * Given a Query object bind the input intervals to it
- * @param query Query to fetch segments
- * @param intervals Intervals to fetch segments for
+ * Bind the supplied {@code intervals} to {@code query}.
+ * @see #getConditionForIntervalsAndMatchMode(Collection, IntervalMode,
String)
*/
- public static void bindQueryIntervals(final Query<Map<String, Object>>
query, final Collection<Interval> intervals)
+ public static void bindIntervalsToQuery(final Query<Map<String, Object>>
query, final Collection<Interval> intervals)
{
if (intervals.isEmpty()) {
return;
@@ -730,13 +730,13 @@ public class SqlSegmentsMetadataQuery
}
if (compareAsString) {
- appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode,
connector);
+ sb.append(getConditionForIntervalsAndMatchMode(intervals, matchMode,
connector.getQuoteString()));
}
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);
if (hasVersions) {
- sb.append(getConditionForVersions(versions));
+ sb.append(getParameterizedInConditionForColumn("version", versions));
}
// Add the used_status_last_updated time filter only for unused segments
when maxUsedStatusLastUpdatedTime is non-null.
@@ -783,11 +783,11 @@ public class SqlSegmentsMetadataQuery
}
if (compareAsString) {
- bindQueryIntervals(sql, intervals);
+ bindIntervalsToQuery(sql, intervals);
}
if (hasVersions) {
- bindVersionsToQuery(sql, versions);
+ bindColumnValuesToQueryWithInCondition("version", versions, sql);
}
return sql;
@@ -890,18 +890,24 @@ public class SqlSegmentsMetadataQuery
return numChangedSegments;
}
- private static String getConditionForVersions(final List<String> versions)
+ /**
+ * @return a parameterized {@code IN} clause for the specified {@code
columnName}. The column values need to be bound
+ * to a query by calling {@link
#bindColumnValuesToQueryWithInCondition(String, List, SQLStatement)}.
+ *
+ * @implNote JDBI 3.x has better support for binding {@code IN} clauses
directly.
+ */
+ private static String getParameterizedInConditionForColumn(final String
columnName, final List<String> values)
{
- if (CollectionUtils.isNullOrEmpty(versions)) {
+ if (CollectionUtils.isNullOrEmpty(values)) {
return "";
}
final StringBuilder sb = new StringBuilder();
- sb.append(" AND version IN (");
- for (int i = 0; i < versions.size(); i++) {
- sb.append(StringUtils.format(":version%d", i));
- if (i != versions.size() - 1) {
+ sb.append(StringUtils.format(" AND %s IN (", columnName));
+ for (int i = 0; i < values.size(); i++) {
+ sb.append(StringUtils.format(":%s%d", columnName, i));
+ if (i != values.size() - 1) {
sb.append(",");
}
}
@@ -909,14 +915,24 @@ public class SqlSegmentsMetadataQuery
return sb.toString();
}
- private static void bindVersionsToQuery(final SQLStatement query, final
List<String> versions)
+ /**
+ * Binds the provided list of {@code values} to the specified {@code
columnName} in the given SQL {@code query} that
+ * contains an {@code IN} clause.
+ *
+ * @see #getParameterizedInConditionForColumn(String, List)
+ */
+ private static void bindColumnValuesToQueryWithInCondition(
+ final String columnName,
+ final List<String> values,
+ final SQLStatement<?> query
+ )
{
- if (CollectionUtils.isNullOrEmpty(versions)) {
+ if (CollectionUtils.isNullOrEmpty(values)) {
return;
}
- for (int i = 0; i < versions.size(); i++) {
- query.bind(StringUtils.format("version%d", i), versions.get(i));
+ for (int i = 0; i < values.size(); i++) {
+ query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]