kfaraz commented on code in PR #15169:
URL: https://github.com/apache/druid/pull/15169#discussion_r1368165054


##########
server/src/main/java/org/apache/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java:
##########
@@ -347,10 +347,13 @@ SegmentPublishResult commitReplaceSegments(
    * </ul>
    *
    * @param replaceSegments Segments being committed by a REPLACE task
+   * @param activeRealtimeSequencePrefixes Set of base sequence names of 
active and pending completion task groups

Review Comment:
   Please update the javadoc to say prefixes instead of sequence names.



##########
extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java:
##########
@@ -217,6 +217,13 @@ public void testMaterializedViewSupervisorSpecCreated()
         Assert.assertTrue(e instanceof UnsupportedOperationException);
       }
 
+      try {

Review Comment:
   Use `Assert.assertThrows` instead and update the existing assertions in this 
method too.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String 
dataSource, Interval interv
 
   /**
    * Fetches all the pending segments, whose interval overlaps with the given
-   * search interval from the metadata store. Returns a Map from the
-   * pending segment ID to the sequence name.
+   * search interval and has a sequence_name that begins with one of the 
prefixes in sequenceNamePrefixFilter
+   * from the metadata store. Returns a Map from the pending segment ID to the 
sequence name.
    */
+  @VisibleForTesting

Review Comment:
   Avoid using this. It is okay to just test the method which invokes this 
internally.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -1833,26 +1900,26 @@ private Set<DataSegment> announceHistoricalSegmentBatch(
         for (DataSegment segment : partition) {
           final String now = DateTimes.nowUtc().toString();
           preparedBatch.add()
-              .bind("id", segment.getId().toString())
-              .bind("dataSource", segment.getDataSource())
-              .bind("created_date", now)
-              .bind("start", segment.getInterval().getStart().toString())
-              .bind("end", segment.getInterval().getEnd().toString())
-              .bind("partitioned", (segment.getShardSpec() instanceof 
NoneShardSpec) ? false : true)
-              .bind("version", segment.getVersion())
-              .bind("used", usedSegments.contains(segment))
-              .bind("payload", jsonMapper.writeValueAsBytes(segment))
-              .bind("used_status_last_updated", now);
+                       .bind("id", segment.getId().toString())

Review Comment:
   Since this is a sensitive PR for a new feature, let's revert all these 
formatting changes for now. Better to do reformatting in refactor PRs.
   
   Please revert all formatting changes in other lines in this class too.



##########
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java:
##########
@@ -93,4 +94,9 @@ default Boolean isHealthy()
   LagStats computeLagStats();
 
   int getActiveTaskGroupsCount();
+
+  /**
+   * @return active base sequence names for reading and pending completion 
task groups of a seekable stream supervisor

Review Comment:
   Update: prefixes instead of sequence names.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String 
dataSource, Interval interv
 
   /**
    * Fetches all the pending segments, whose interval overlaps with the given
-   * search interval from the metadata store. Returns a Map from the
-   * pending segment ID to the sequence name.
+   * search interval and has a sequence_name that begins with one of the 
prefixes in sequenceNamePrefixFilter
+   * from the metadata store. Returns a Map from the pending segment ID to the 
sequence name.
    */
+  @VisibleForTesting
+  Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+      final Handle handle,
+      final String dataSource,
+      final Interval interval,
+      final Set<String> sequenceNamePrefixFilter
+  ) throws IOException
+  {
+    if (sequenceNamePrefixFilter.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> sequenceNamePrefixes = new 
ArrayList<>(sequenceNamePrefixFilter);
+    StringBuilder sql = new StringBuilder(
+        "SELECT sequence_name, payload FROM "
+        + dbTables.getPendingSegmentsTable()
+        + " WHERE dataSource = :dataSource AND start < :end AND "
+        + connector.getQuoteString() + "end" + connector.getQuoteString() + " 
> :start"
+    );
+
+    sql.append(" AND ( ");
+
+    for (int i = 1; i < sequenceNamePrefixes.size(); i++) {
+      sql.append("(sequence_name LIKE ")
+         .append(StringUtils.format(":prefix%d", i))
+         .append(")")
+         .append(" OR ");
+    }
+
+    sql.append("(sequence_name LIKE ")
+       .append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size()))
+       .append(")");
+
+    sql.append(" )");
+
+    Query<Map<String, Object>> query =
+        handle.createQuery(sql.toString())
+              .bind("dataSource", dataSource)
+              .bind("start", interval.getStart().toString())
+              .bind("end", interval.getEnd().toString());
+
+    for (int i = 1; i <= sequenceNamePrefixes.size(); i++) {
+      query.bind(StringUtils.format("prefix%d", i), sequenceNamePrefixes.get(i 
- 1) + "%");

Review Comment:
   What do we need the `%` for in `sequenceNamePrefixes.get(i - 1) + "%"`



##########
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java:
##########
@@ -295,6 +295,12 @@ public LagStats computeLagStats()
     throw new UnsupportedOperationException("Compute Lag Stats not supported 
in MaterializedViewSupervisor");
   }
 
+  @Override
+  public Set<String> getActiveBaseSequenceNames()
+  {
+    throw new UnsupportedOperationException("Get Active sequence names is not 
supported in MaterializedViewSupervisor");

Review Comment:
   Please fix this as the message is redundant.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String 
dataSource, Interval interv
 
   /**
    * Fetches all the pending segments, whose interval overlaps with the given
-   * search interval from the metadata store. Returns a Map from the
-   * pending segment ID to the sequence name.
+   * search interval and has a sequence_name that begins with one of the 
prefixes in sequenceNamePrefixFilter
+   * from the metadata store. Returns a Map from the pending segment ID to the 
sequence name.
    */
+  @VisibleForTesting
+  Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+      final Handle handle,
+      final String dataSource,
+      final Interval interval,
+      final Set<String> sequenceNamePrefixFilter
+  ) throws IOException
+  {
+    if (sequenceNamePrefixFilter.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> sequenceNamePrefixes = new 
ArrayList<>(sequenceNamePrefixFilter);
+    StringBuilder sql = new StringBuilder(

Review Comment:
   would be much easier to read and understand the sql if written using a 
format string.
   
   ```
   SELECT sequence, payload
   FROM %s
   WHERE dataSource=:dataSource
   AND start < :end
   AND end < :start
   AND (:sequenceFilterCondition)
   ```
   
   Then build a separate string for `sequenceFilterCondition` which would 
basically be a bunch of filters tied by `OR`s.



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String 
dataSource, Interval interv
 
   /**
    * Fetches all the pending segments, whose interval overlaps with the given
-   * search interval from the metadata store. Returns a Map from the
-   * pending segment ID to the sequence name.
+   * search interval and has a sequence_name that begins with one of the 
prefixes in sequenceNamePrefixFilter
+   * from the metadata store. Returns a Map from the pending segment ID to the 
sequence name.
    */
+  @VisibleForTesting
+  Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+      final Handle handle,
+      final String dataSource,
+      final Interval interval,
+      final Set<String> sequenceNamePrefixFilter
+  ) throws IOException
+  {
+    if (sequenceNamePrefixFilter.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> sequenceNamePrefixes = new 
ArrayList<>(sequenceNamePrefixFilter);
+    StringBuilder sql = new StringBuilder(

Review Comment:
   Use `.append` instead of `+` as it is already a `StringBuilder`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1093,6 +1093,21 @@ public void resetOffsets(@Nonnull DataSourceMetadata 
resetDataSourceMetadata)
     addNotice(new ResetOffsetsNotice(resetDataSourceMetadata));
   }
 
+  @Override

Review Comment:
   Thanks!



##########
server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java:
##########
@@ -258,9 +258,73 @@ public int markSegmentsAsUnusedWithinInterval(String 
dataSource, Interval interv
 
   /**
    * Fetches all the pending segments, whose interval overlaps with the given
-   * search interval from the metadata store. Returns a Map from the
-   * pending segment ID to the sequence name.
+   * search interval and has a sequence_name that begins with one of the 
prefixes in sequenceNamePrefixFilter
+   * from the metadata store. Returns a Map from the pending segment ID to the 
sequence name.
    */
+  @VisibleForTesting
+  Map<SegmentIdWithShardSpec, String> getPendingSegmentsForIntervalWithHandle(
+      final Handle handle,
+      final String dataSource,
+      final Interval interval,
+      final Set<String> sequenceNamePrefixFilter
+  ) throws IOException
+  {
+    if (sequenceNamePrefixFilter.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    final List<String> sequenceNamePrefixes = new 
ArrayList<>(sequenceNamePrefixFilter);
+    StringBuilder sql = new StringBuilder(
+        "SELECT sequence_name, payload FROM "
+        + dbTables.getPendingSegmentsTable()
+        + " WHERE dataSource = :dataSource AND start < :end AND "
+        + connector.getQuoteString() + "end" + connector.getQuoteString() + " 
> :start"
+    );
+
+    sql.append(" AND ( ");
+
+    for (int i = 1; i < sequenceNamePrefixes.size(); i++) {
+      sql.append("(sequence_name LIKE ")
+         .append(StringUtils.format(":prefix%d", i))
+         .append(")")
+         .append(" OR ");
+    }
+
+    sql.append("(sequence_name LIKE ")
+       .append(StringUtils.format(":prefix%d", sequenceNamePrefixes.size()))
+       .append(")");

Review Comment:
   Cleaner to just do this `for i = 0 to sequenceNamePrefixes.size()`:
   
   ```suggestion
           sequenceFilterCondition.append(
               StringUtils.format("(sequence_name LIKE :prefix%d)", i)
           )
   ```
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java:
##########
@@ -110,6 +111,14 @@ public Optional<String> 
getActiveSupervisorIdForDatasourceWithAppendLock(String
     return Optional.absent();
   }
 
+  public Set<String> getActiveRealtimeSequencePrefixes(String 
activeSupervisorId)
+  {
+    if (!supervisors.containsKey(activeSupervisorId)) {

Review Comment:
   Please invert the if condition:
   ```
   if contains:
      return value
   else
     return empty
   ```  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to