kfaraz commented on code in PR #18561:
URL: https://github.com/apache/druid/pull/18561#discussion_r2370982111


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java:
##########
@@ -72,7 +72,8 @@ protected String runTask(TaskBuilder<?, ?, ?, ?> taskBuilder)
   {
     final String taskId = IdUtils.getRandomId();
     
cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId), 
overlord);
-    cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, 
broker);
+    boolean useCentralizedSchema = Boolean.parseBoolean((String) 
cluster.getCommonProperties().getOrDefault("druid.centralizedDatasourceSchema.enabled",
 "false"));

Review Comment:
   ```suggestion
       boolean useCentralizedSchema = 
Boolean.parseBoolean(cluster.getCommonProperties().getProperty("druid.centralizedDatasourceSchema.enabled",
 "false"));
   ```



##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/schema/CentralizedSchemaMetadataQueryDisabledTest.java:
##########
@@ -39,14 +38,14 @@ private static EmbeddedDruidCluster 
configureCluster(EmbeddedDruidCluster cluste
            
.addCommonProperty("druid.centralizedDatasourceSchema.backFillEnabled", "true")
            
.addCommonProperty("druid.centralizedDatasourceSchema.backFillPeriod", "500")
            
.addCommonProperty("druid.coordinator.segmentMetadata.disableSegmentMetadataQueries",
 "true")
+           .addCommonProperty("druid.sql.planner.metadataRefreshPeriod", 
"PT0.1s")

Review Comment:
   This property is already added to `EmbeddedBroker`. Do we need to add it 
here too?



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java:
##########
@@ -223,8 +223,23 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
     polledDataSourceMetadata.forEach(this::updateDSMetadata);
 
     // Remove segments of the datasource from refresh list for which we 
received schema from the Coordinator.
+    final Map<String, Integer> segmentsSkippedPerDatasource = new HashMap<>();
+
+    // Count segments per datasource before removal
+    for (SegmentId segmentId : segmentsToRefresh) {
+      if (polledDataSourceMetadata.containsKey(segmentId.getDataSource())) {
+        segmentsSkippedPerDatasource.merge(segmentId.getDataSource(), 1, 
Integer::sum);
+      }
+    }
+
     segmentsToRefresh.removeIf(segmentId -> 
polledDataSourceMetadata.containsKey(segmentId.getDataSource()));
 
+    // Emit metrics per datasource
+    segmentsSkippedPerDatasource.forEach((dataSource, count) -> {
+      emitMetric(Metric.BROKER_SEGMENTS_SKIPPED_REFRESH, count,
+                 new 
ServiceMetricEvent.Builder().setDimension(DruidMetrics.DATASOURCE, dataSource));

Review Comment:
   Please put each arg in a separate line.



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java:
##########
@@ -223,8 +223,23 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
     polledDataSourceMetadata.forEach(this::updateDSMetadata);
 
     // Remove segments of the datasource from refresh list for which we 
received schema from the Coordinator.
+    final Map<String, Integer> segmentsSkippedPerDatasource = new HashMap<>();
+
+    // Count segments per datasource before removal
+    for (SegmentId segmentId : segmentsToRefresh) {
+      if (polledDataSourceMetadata.containsKey(segmentId.getDataSource())) {
+        segmentsSkippedPerDatasource.merge(segmentId.getDataSource(), 1, 
Integer::sum);
+      }
+    }
+
     segmentsToRefresh.removeIf(segmentId -> 
polledDataSourceMetadata.containsKey(segmentId.getDataSource()));
 
+    // Emit metrics per datasource
+    segmentsSkippedPerDatasource.forEach((dataSource, count) -> {

Review Comment:
   Nit: Lambda can be simplified



##########
services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java:
##########
@@ -280,20 +280,31 @@ public void verifyNumVisibleSegmentsIs(int 
numExpectedSegments, String dataSourc
     );
   }
 
+  public void waitForAllSegmentsToBeAvailable(String dataSource, 
EmbeddedCoordinator coordinator, EmbeddedBroker broker)
+  {
+    waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker, false);
+  }
+
   /**
    * Waits for all used segments (including overshadowed) of the given 
datasource
    * to be queryable by Brokers.
    */
-  public void waitForAllSegmentsToBeAvailable(String dataSource, 
EmbeddedCoordinator coordinator, EmbeddedBroker broker)
+  public void waitForAllSegmentsToBeAvailable(

Review Comment:
   We shouldn't change this method. Please add a new method which waits on the 
new metric.



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java:
##########
@@ -223,8 +223,23 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
     polledDataSourceMetadata.forEach(this::updateDSMetadata);
 
     // Remove segments of the datasource from refresh list for which we 
received schema from the Coordinator.
+    final Map<String, Integer> segmentsSkippedPerDatasource = new HashMap<>();

Review Comment:
   ```suggestion
       final Map<String, Integer> datasourceToNumSegmentsSkipped = new 
HashMap<>();
   ```



##########
sql/src/main/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCache.java:
##########
@@ -223,8 +223,23 @@ public void refresh(final Set<SegmentId> 
segmentsToRefresh, final Set<String> da
     polledDataSourceMetadata.forEach(this::updateDSMetadata);
 
     // Remove segments of the datasource from refresh list for which we 
received schema from the Coordinator.
+    final Map<String, Integer> segmentsSkippedPerDatasource = new HashMap<>();

Review Comment:
   Please put the entire new logic in a separate method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to