kfaraz commented on code in PR #15169:
URL: https://github.com/apache/druid/pull/15169#discussion_r1368165054
##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -347,10 +347,13 @@ SegmentPublishResult commitReplaceSegments(
* </ul>
*
* @param replaceSegments Segments being committed by a REPLACE task
+ * @param activeRealtimeSequencePrefixes Set of base sequence names of
active and pending completion task groups
Review Comment:
Please update the javadoc to say prefixes instead of sequence names.
##########
extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java:
##########
@@ -217,6 +217,13 @@ public void testMaterializedViewSupervisorSpecCreated()
Assert.assertTrue(e instanceof UnsupportedOperationException);
}
+ try {
Review Comment:
Use `Assert.assertThrows` instead and update the existing assertions in this
method too.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String
dataSource, Interval interv
/**
* Fetches all the pending segments, whose interval overlaps with the given
- * search interval from the metadata store. Returns a Map from the
- * pending segment ID to the sequence name.
+ * search interval and has a sequence_name that begins with one of the
prefixes in sequenceNamePrefixFilter
+ * from the metadata store. Returns a Map from the pending segment ID to the
sequence name.
*/
+ @VisibleForTesting
Review Comment:
Avoid using this. It is okay to just test the method which invokes this
internally.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1833,26 +1900,26 @@ private Set<DataSegment> announceHistoricalSegmentBatch(
for (DataSegment segment : partition) {
final String now = DateTimes.nowUtc().toString();
preparedBatch.add()
- .bind("id", segment.getId().toString())
- .bind("dataSource", segment.getDataSource())
- .bind("created_date", now)
- .bind("start", segment.getInterval().getStart().toString())
- .bind("end", segment.getInterval().getEnd().toString())
- .bind("partitioned", (segment.getShardSpec() instanceof
NoneShardSpec) ? false : true)
- .bind("version", segment.getVersion())
- .bind("used", usedSegments.contains(segment))
- .bind("payload", jsonMapper.writeValueAsBytes(segment))
- .bind("used_status_last_updated", now);
+ .bind("id", segment.getId().toString())
Review Comment:
Since this is a sensitive PR for a new feature, let's revert all these
formatting changes for now. Better to do reformatting in refactor PRs.
Please revert all formatting changes in other lines in this class too.
##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -93,4 +94,9 @@ default Boolean isHealthy()
LagStats computeLagStats();
int getActiveTaskGroupsCount();
+
+ /**
+ * @return active base sequence names for reading and pending completion
task groups of a seekable stream supervisor
Review Comment:
Update: prefixes instead of sequence names.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String
dataSource, Interval interv
/**
* Fetches all the pending segments, whose interval overlaps with the given
- * search interval from the metadata store. Returns a Map from the
- * pending segment ID to the sequence name.
+ * search interval and has a sequence_name that begins with one of the
prefixes in sequenceNamePrefixFilter
+ * from the metadata store. Returns a Map from the pending segment ID to the
sequence name.
*/
+ @VisibleForTesting
+ Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+ final Handle handle,
+ final String dataSource,
+ final Interval interval,
+ final Set<String> sequenceNamePrefixFilter
+ ) throws IOException
+ {
+ if (sequenceNamePrefixFilter.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final List<String> sequenceNamePrefixes = new
ArrayList<>(sequenceNamePrefixFilter);
+ StringBuilder sql = new StringBuilder(
+ "SELECT sequence_name, payload FROM "
+ + dbTables.getPendingSegmentsTable()
+ + " WHERE dataSource = :dataSource AND start < :end AND "
+ + connector.getQuoteString() + "end" + connector.getQuoteString() + "
> :start"
+ );
+
+ sql.append(" AND ( ");
+
+ for (int i = 1; i < sequenceNamePrefixes.size(); i++) {
+ sql.append("(sequence_name LIKE ")
+ .append(StringUtils.format(":prefix%d", i))
+ .append(")")
+ .append(" OR ");
+ }
+
+ sql.append("(sequence_name LIKE ")
+ .append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size()))
+ .append(")");
+
+ sql.append(" )");
+
+ Query<Map<String, Object>> query =
+ handle.createQuery(sql.toString())
+ .bind("dataSource", dataSource)
+ .bind("start", interval.getStart().toString())
+ .bind("end", interval.getEnd().toString());
+
+ for (int i = 1; i <= sequenceNamePrefixes.size(); i++) {
+ query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i
- 1) + "%");
Review Comment:
What do we need the `%` for in `sequenceNamePrefixes.get(i - 1) + "%"`
##########
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java:
##########
@@ -295,6 +295,12 @@ public LagStats computeLagStats()
throw new UnsupportedOperationException("Compute Lag Stats not supported
in MaterializedViewSupervisor");
}
+ @Override
+ public Set<String> getActiveBaseSequenceNames()
+ {
+ throw new UnsupportedOperationException("Get Active sequence names is not
supported in MaterializedViewSupervisor");
Review Comment:
Please fix this as the message is redundant.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String
dataSource, Interval interv
/**
* Fetches all the pending segments, whose interval overlaps with the given
- * search interval from the metadata store. Returns a Map from the
- * pending segment ID to the sequence name.
+ * search interval and has a sequence_name that begins with one of the
prefixes in sequenceNamePrefixFilter
+ * from the metadata store. Returns a Map from the pending segment ID to the
sequence name.
*/
+ @VisibleForTesting
+ Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+ final Handle handle,
+ final String dataSource,
+ final Interval interval,
+ final Set<String> sequenceNamePrefixFilter
+ ) throws IOException
+ {
+ if (sequenceNamePrefixFilter.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final List<String> sequenceNamePrefixes = new
ArrayList<>(sequenceNamePrefixFilter);
+ StringBuilder sql = new StringBuilder(
Review Comment:
would be much easier to read and understand the sql if written using a
format string.
```
SELECT sequence, payload
FROM %s
WHERE dataSource=:dataSource
AND start < :end
AND end < :start
AND (:sequenceFilterCondition)
```
Then build a separate string for `sequenceFilterCondition` which would
basically be a bunch of filters tied by `OR`s.
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String
dataSource, Interval interv
/**
* Fetches all the pending segments, whose interval overlaps with the given
- * search interval from the metadata store. Returns a Map from the
- * pending segment ID to the sequence name.
+ * search interval and has a sequence_name that begins with one of the
prefixes in sequenceNamePrefixFilter
+ * from the metadata store. Returns a Map from the pending segment ID to the
sequence name.
*/
+ @VisibleForTesting
+ Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+ final Handle handle,
+ final String dataSource,
+ final Interval interval,
+ final Set<String> sequenceNamePrefixFilter
+ ) throws IOException
+ {
+ if (sequenceNamePrefixFilter.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final List<String> sequenceNamePrefixes = new
ArrayList<>(sequenceNamePrefixFilter);
+ StringBuilder sql = new StringBuilder(
Review Comment:
Use `.append` instead of `+` as it is already a `StringBuilder`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1093,6 +1093,21 @@ public void resetOffsets(@Nonnull DataSourceMetadata
resetDataSourceMetadata)
addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
}
+ @Override
Review Comment:
Thanks!
##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String
dataSource, Interval interv
/**
* Fetches all the pending segments, whose interval overlaps with the given
- * search interval from the metadata store. Returns a Map from the
- * pending segment ID to the sequence name.
+ * search interval and has a sequence_name that begins with one of the
prefixes in sequenceNamePrefixFilter
+ * from the metadata store. Returns a Map from the pending segment ID to the
sequence name.
*/
+ @VisibleForTesting
+ Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+ final Handle handle,
+ final String dataSource,
+ final Interval interval,
+ final Set<String> sequenceNamePrefixFilter
+ ) throws IOException
+ {
+ if (sequenceNamePrefixFilter.isEmpty()) {
+ return Collections.emptyMap();
+ }
+
+ final List<String> sequenceNamePrefixes = new
ArrayList<>(sequenceNamePrefixFilter);
+ StringBuilder sql = new StringBuilder(
+ "SELECT sequence_name, payload FROM "
+ + dbTables.getPendingSegmentsTable()
+ + " WHERE dataSource = :dataSource AND start < :end AND "
+ + connector.getQuoteString() + "end" + connector.getQuoteString() + "
> :start"
+ );
+
+ sql.append(" AND ( ");
+
+ for (int i = 1; i < sequenceNamePrefixes.size(); i++) {
+ sql.append("(sequence_name LIKE ")
+ .append(StringUtils.format(":prefix%d", i))
+ .append(")")
+ .append(" OR ");
+ }
+
+ sql.append("(sequence_name LIKE ")
+ .append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size()))
+ .append(")");
Review Comment:
Cleaner to just do this `for i = 0 to sequenceNamePrefixes.size()`:
```suggestion
sequenceFilterCondition.append(
StringUtils.format("(sequence_name LIKE :prefix%d)", i)
)
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -110,6 +111,14 @@ public Optional<String>
getActiveSupervisorIdForDatasourceWithAppendLock(String
return Optional.absent();
}
+ public Set<String> getActiveRealtimeSequencePrefixes(String
activeSupervisorId)
+ {
+ if (!supervisors.containsKey(activeSupervisorId)) {
Review Comment:
Please invert the if condition:
```
if contains:
return value
else
return empty
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]