findingrish commented on code in PR #16676:
URL: https://github.com/apache/druid/pull/16676#discussion_r1665249589


##########
server/src/main/java/org/apache/druid/server/coordinator/SegmentReplicationStatusManager.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator;
+
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntMaps;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.Object2LongMap;
+import org.apache.druid.guice.LazySingleton;
+import org.apache.druid.metadata.SegmentsMetadataManager;
+import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
+import org.apache.druid.server.coordinator.duty.RunRules;
+import org.apache.druid.server.coordinator.loading.SegmentReplicaCount;
+import org.apache.druid.server.coordinator.loading.SegmentReplicationStatus;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.Dimension;
+import org.apache.druid.server.coordinator.stats.RowKey;
+import org.apache.druid.server.coordinator.stats.Stats;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+
+import javax.annotation.Nullable;
+import javax.inject.Inject;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Manages information about replication status of segments in a cluster.
+ */
+@LazySingleton
+public class SegmentReplicationStatusManager
+{
+  private final MetadataManager metadataManager;
+
+  /**
+   * Set of broadcast segments determined in the latest coordinator run of the 
{@link RunRules} duty.
+   * This might contain stale information if the Coordinator duties haven't 
run or are delayed.
+   */
+  private volatile Set<DataSegment> broadcastSegments = null;
+
+  /**
+   * Used to determine count of under-replicated or unavailable segments.
+   * Updated in each coordinator run in the {@link 
SegmentReplicationStatusManager.UpdateReplicationStatus} duty.
+   * <p>
+   * This might have stale information if coordinator runs are delayed. But as
+   * long as the {@link SegmentsMetadataManager} has the latest information of
+   * used segments, we would only have false negatives and not false positives.
+   * In other words, we might report some segments as under-replicated or
+   * unavailable even if they are fully replicated. But if a segment is 
reported
+   * as fully replicated, it is guaranteed to be so.
+   */
+  private volatile SegmentReplicationStatus segmentReplicationStatus = null;
+
+  @Inject
+  public SegmentReplicationStatusManager(MetadataManager metadataManager)
+  {
+    this.metadataManager = metadataManager;
+  }
+
+  /**
+   * @return Set of broadcast segments determined by the latest run of the 
{@link RunRules} duty.
+   * If the coordinator runs haven't triggered or are delayed, this 
information may be stale.
+   */
+  @Nullable
+  public Set<DataSegment> getBroadcastSegments()
+  {
+    return broadcastSegments;
+  }
+
+  public Object2IntMap<String> getDatasourceToUnavailableSegmentCount()
+  {
+    if (segmentReplicationStatus == null) {
+      return Object2IntMaps.emptyMap();
+    }
+
+    final Object2IntOpenHashMap<String> datasourceToUnavailableSegments = new 
Object2IntOpenHashMap<>();
+
+    final Iterable<DataSegment> dataSegments = 
metadataManager.segments().iterateAllUsedSegments();
+    for (DataSegment segment : dataSegments) {
+      SegmentReplicaCount replicaCount = 
segmentReplicationStatus.getReplicaCountsInCluster(segment.getId());
+      if (replicaCount != null && (replicaCount.totalLoaded() > 0 || 
replicaCount.required() == 0)) {
+        datasourceToUnavailableSegments.addTo(segment.getDataSource(), 0);
+      } else {
+        datasourceToUnavailableSegments.addTo(segment.getDataSource(), 1);
+      }
+    }
+
+    return datasourceToUnavailableSegments;
+  }
+
+  public Object2IntMap<String> 
getDatasourceToDeepStorageQueryOnlySegmentCount()

Review Comment:
   Currently `DruidCoordinator` has a dependency on 
`CoordinatorSegmentMetadataCache`, for this patch I need to use 
`DruidCoordinator#getSegmentReplicationFactor` in 
`CoordinatorSegmentMetadataCache` which is resulting in cyclic dependency. 
   
   As a solution, I have refactored `DruidCoordinator` to separate out the code 
which updates `segmentReplicationStatus` and `broadcastSegments`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to