Copilot commented on code in PR #18558:
URL: https://github.com/apache/druid/pull/18558#discussion_r2480180080


##########
server/src/main/java/org/apache/druid/client/CachingClusteredClient.java:
##########
@@ -476,15 +486,44 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
           ServerSelector server = chunk.getObject();
           if (isRealtimeSegmentOnly && !server.isRealtimeSegment()) {
             continue; // Skip historical segments when only realtime segments 
are requested
+          } else if (brokerSegmentWatcherConfig.detectUnavailableSegments() && 
!server.isQueryable()) {
+            log.debug("ServerSelector for segment id [%s] is not queryable", 
server.getSegment().getId());
+            continue;
           }
+
           final SegmentDescriptor segment = new SegmentDescriptor(
               holder.getInterval(),
               holder.getVersion(),
               chunk.getChunkNumber()
           );
           segments.add(new SegmentServerSelector(server, segment));
+          if (server.isEmpty()) {
+            unavailableSegmentsIds.add(server.getSegment().getId());
+          }
+        }
+      }
+
+      if (brokerSegmentWatcherConfig.detectUnavailableSegments() && 
!unavailableSegmentsIds.isEmpty()) {
+        
queryPlus.getQueryMetrics().reportUnavailableSegmentCount(unavailableSegmentsIds.size()).emit(emitter);
+        log.warn(
+            "Detected [%d] unavailable segments, trimmed segment ids: [%s]",
+            unavailableSegmentsIds.size(),
+            unavailableSegmentsIds.subList(0, 10)
+        );
+        if (unavailableSegmentsAction == UnavailableSegmentsAction.FAIL) {
+          throw new QueryException(
+              QueryException.UNAVAILABLE_SEGMENTS_ERROR_CODE,
+              StringUtils.format(
+                  "Detected [%d] unavailable segments, trimmed segment ids: 
[%s]",
+                  unavailableSegmentsIds.size(),
+                  unavailableSegmentsIds.subList(0, 100)

Review Comment:
   Same issue as the previous comment - the code doesn't validate that the list 
size is at least 100 before calling `subList(0, 100)`. Consider using 
`Math.min(100, unavailableSegmentsIds.size())` to avoid potential 
IndexOutOfBoundsException.
   ```suggestion
                     unavailableSegmentsIds.subList(0, Math.min(100, 
unavailableSegmentsIds.size()))
   ```



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -56,7 +62,61 @@
 import java.util.stream.Collectors;
 
 /**
- *
+ * <p>
+ * Maintains a timeline of segments per datasource.
+ * This timeline is populated by callback received from datanodes when they 
load a segment
+ * or when realtime segments are created in a task.
+ * Downstream classes can also register timeline callback on this class, for 
example BrokerSegmentMetadataCache.
+ * </p>
+ * <p>
+ * There is a second flow which is enabled only when unavailabe segment 
detection is turned on.

Review Comment:
   Corrected spelling of 'unavailabe' to 'unavailable'.
   ```suggestion
    * There is a second flow which is enabled only when unavailable segment 
detection is turned on.
   ```



##########
server/src/main/java/org/apache/druid/server/http/MetadataResource.java:
##########
@@ -201,6 +206,138 @@ public Response getAllUsedSegments(
     }
   }
 
+  /**
+   * <p>This endpoint is used by MetadataSegmentView in broker to keep an 
up-to-date list of segments present in the system.
+   * This endpoint lists segments present in the system and can also 
incrementally provide the segments added/dropped
+   * since last response.</p>
+   * <br>Flow
+   * <ol>
+   * <li>Client sends first request 
/druid/coordinator/v1/metadata/changedSegments?counter=-1
+   * Server responds with list of segments currently present and a 
<counter,hash> pair. </li>
+   * <li>Client sends subsequent requests 
/druid/coordinator/v1/metadata/changedSegments?counter=<counter>&hash=<hash>
+   * Where <counter,hash> values are used from the last response. Server 
responds with list of segment updates
+   * since given counter.</li>
+   * </ol>
+   *
+   * @param req request
+   * @param dataSources requested datasources
+   * @param counter counter received in last response.
+   * @param hash hash received in last response.
+   */
+  @GET
+  @Path("/changedSegments")
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response getChangedSegments(
+      @Context final HttpServletRequest req,
+      @QueryParam("datasources") final @Nullable Set<String> dataSources,
+      @QueryParam("counter") long counter,
+      @QueryParam("hash") long hash
+  )
+  {
+    Set<String> requiredDataSources = (null == dataSources) ? new HashSet<>() 
: dataSources;
+
+    log.debug(
+        "Changed segments requested. counter [%d], hash [%d], dataSources 
[%s]",
+        counter,
+        hash,
+        requiredDataSources
+    );
+
+    DataSourcesSnapshot dataSourcesSnapshot = 
segmentsMetadataManager.getSnapshotOfDataSourcesWithAllUsedSegments();
+    ChangeRequestHistory<List<DataSegmentChange>> changeRequestHistory = 
segmentsMetadataManager.getChangeRequestHistory();
+
+    ChangeRequestsSnapshot<List<DataSegmentChange>> changeRequestsSnapshot = 
changeRequestHistory.getRequestsSinceSync(
+        new ChangeRequestHistory.Counter(counter, hash));
+    List<List<DataSegmentChange>> requests = 
changeRequestsSnapshot.getRequests();
+    List<DataSegmentChange> flatRequests = new ArrayList<>();
+    if (null != requests) {
+      requests.forEach(flatRequests::addAll);
+    }
+
+    List<DataSegmentChange> dataSegmentChanges;
+    ChangeRequestHistory.Counter lastCounter = 
changeRequestsSnapshot.getCounter();
+    boolean reset = false;
+    String resetCause = "";
+    if (changeRequestsSnapshot.isResetCounter()) {
+      reset = true;
+      dataSegmentChanges =
+          dataSourcesSnapshot
+              .getDataSourcesWithAllUsedSegments()
+              .stream()
+              .flatMap(druidDataSource -> 
druidDataSource.getSegments().stream())
+              .filter(segment -> requiredDataSources.isEmpty()
+                                 || 
requiredDataSources.contains(segment.getDataSource()))
+              .map(segment -> {
+                Long numRows = null;
+                if (coordinatorSegmentMetadataCache != null) {
+                  AvailableSegmentMetadata availableSegmentMetadata = 
coordinatorSegmentMetadataCache.getAvailableSegmentMetadata(
+                      segment.getDataSource(),
+                      segment.getId()
+                  );
+                  if (null != availableSegmentMetadata) {
+                    numRows = availableSegmentMetadata.getNumRows();
+                  }
+                }
+                return new DataSegmentChange(
+                    new SegmentStatusInCluster(
+                        segment,
+                        
dataSourcesSnapshot.getOvershadowedSegments().contains(segment),
+                        null,
+                        numRows,
+                        false, //

Review Comment:
   Incomplete inline comment. The comment appears to be cut off or empty. 
Either complete the comment to explain why this parameter is false, or remove 
the comment marker.
   ```suggestion
                           false,
   ```



##########
server/src/main/java/org/apache/druid/metadata/SqlSegmentsMetadataManager.java:
##########
@@ -804,13 +856,57 @@ private void createDatasourcesSnapshot(DateTime 
snapshotTime, List<DataSegment>
     // Updates outside of database polls were primarily for the user 
experience, so users would immediately see the
     // effect of a segment mark call reflected in MetadataResource API calls.
 
-    dataSourcesSnapshot = DataSourcesSnapshot.fromUsedSegments(
-        Iterables.filter(segments, Objects::nonNull), // Filter corrupted 
entries (see above in this method).
-        snapshotTime
-    );
-    emitMetric("segment/buildSnapshot/time", stopwatch.millisElapsed());
-    log.debug(
-        "Created snapshot from polled segments in [%d]ms. Found [%d] 
overshadowed segments.",
+    if (segments.isEmpty()) {
+      log.debug("No segments found in the database!");
+    } else {
+      log.info("Polled and found %,d segments in the database", 
segments.size());
+    }

Review Comment:
   [nitpick] The empty segment check and logging were added here, but this 
duplicates the logging that occurs after the snapshot is created at line 908. 
Consider consolidating the logging to avoid redundancy.
   ```suggestion
       // Removed redundant logging of segment count; snapshot creation log 
below suffices.
   ```



##########
server/src/main/java/org/apache/druid/client/CachingClusteredClient.java:
##########
@@ -476,15 +486,44 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
           ServerSelector server = chunk.getObject();
           if (isRealtimeSegmentOnly && !server.isRealtimeSegment()) {
             continue; // Skip historical segments when only realtime segments 
are requested
+          } else if (brokerSegmentWatcherConfig.detectUnavailableSegments() && 
!server.isQueryable()) {
+            log.debug("ServerSelector for segment id [%s] is not queryable", 
server.getSegment().getId());
+            continue;
           }
+
           final SegmentDescriptor segment = new SegmentDescriptor(
               holder.getInterval(),
               holder.getVersion(),
               chunk.getChunkNumber()
           );
           segments.add(new SegmentServerSelector(server, segment));
+          if (server.isEmpty()) {
+            unavailableSegmentsIds.add(server.getSegment().getId());
+          }
+        }
+      }
+
+      if (brokerSegmentWatcherConfig.detectUnavailableSegments() && 
!unavailableSegmentsIds.isEmpty()) {
+        
queryPlus.getQueryMetrics().reportUnavailableSegmentCount(unavailableSegmentsIds.size()).emit(emitter);
+        log.warn(
+            "Detected [%d] unavailable segments, trimmed segment ids: [%s]",
+            unavailableSegmentsIds.size(),
+            unavailableSegmentsIds.subList(0, 10)

Review Comment:
   The code doesn't validate that the list size is at least 10 before calling 
`subList(0, 10)`, which could throw an IndexOutOfBoundsException if there are 
fewer than 10 unavailable segments. Consider using `Math.min(10, 
unavailableSegmentsIds.size())` instead.



##########
sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java:
##########
@@ -682,11 +684,11 @@ public void testSegmentsTableWithProjection() throws 
JsonProcessingException
   {
     final SegmentsTable segmentsTable = new SegmentsTable(druidSchema, 
metadataView, MAPPER, authMapper);
     final Set<SegmentStatusInCluster> publishedSegments = new 
HashSet<>(Arrays.asList(
-        new SegmentStatusInCluster(publishedCompactedSegment1, true, 2, null, 
false),
-        new SegmentStatusInCluster(publishedCompactedSegment2, false, 0, null, 
false),
-        new SegmentStatusInCluster(publishedUncompactedSegment3, false, 2, 
null, false),
-        new SegmentStatusInCluster(segment1, true, 2, null, false),
-        new SegmentStatusInCluster(segment2, false, 0, null, false)
+        new SegmentStatusInCluster(publishedCompactedSegment1, true, 2, null, 
false,false),

Review Comment:
   Missing space after comma between the two boolean parameters. Should be 
`false, false` for consistency with the other lines in this test file.
   ```suggestion
           new SegmentStatusInCluster(publishedCompactedSegment1, true, 2, 
null, false, false),
   ```



##########
server/src/main/java/org/apache/druid/client/BrokerServerView.java:
##########
@@ -56,7 +62,61 @@
 import java.util.stream.Collectors;
 
 /**
- *
+ * <p>
+ * Maintains a timeline of segments per datasource.
+ * This timeline is populated by callback received from datanodes when they 
load a segment
+ * or when realtime segments are created in a task.
+ * Downstream classes can also register timeline callback on this class, for 
example BrokerSegmentMetadataCache.
+ * </p>
+ * <p>
+ * There is a second flow which is enabled only when unavailabe segment 
detection is turned on.
+ * {@link MetadataSegmentView} polls published segment metadata from the 
Coordinator. This class listens for published
+ * segment updates.
+ * The segment which are marked as `loaded` (used segment with non-zero 
replication factor which
+ * has been once loaded onto some historical) is added to the timeline. This 
is basically the set of segments that
+ * should be available for querying.
+ * </p>
+ * <p>
+ * Lifecycle for a segment s,
+ * <br>- s is created at time t1, ie. metadata for it is published in the 
database at time t1
+ * <br>- coordinator polls it at time t2, at this point the segment s is not 
considered as loaded
+ * <br>- broker polls coordinator at time t3 and finds segment s, but it is 
not added to the timeline since it is not loaded
+ * <br>- historical loads s at time t4
+ * <br>- coordinator and broker both receives callback from the historical
+ * <br>- coordinator marks the segment as `loaded` at time t5
+ * </p>
+ * now, lets consider two possibilities,
+ * <ol>
+ * <li>Broker receives segment metadata for s from coordinator before the 
callback from historical
+ * <ul>
+ *   <li>
+ *     broker adds s to the timeline at t6 with `queryable` field set to true,
+ *     at this point any query that uses s would find that s is unavailable
+ *   </li>
+ *   <li>
+ *     broker recieves callback for s from some historical at t7,

Review Comment:
   Corrected spelling of 'recieves' to 'receives'.
   ```suggestion
    *     broker receives callback for s from some historical at t7,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to