abhishekagarwal87 commented on code in PR #14517:
URL: https://github.com/apache/druid/pull/14517#discussion_r1252540327


##########
docs/operations/metrics.md:
##########
@@ -324,6 +325,8 @@ These metrics are for the Druid Coordinator and are reset 
each time the Coordina
 |`metadata/kill/rule/count`|Total number of rules that were automatically 
deleted from metadata store per each Coordinator kill rule duty run. This 
metric can help adjust `druid.coordinator.kill.rule.durationToRetain` 
configuration based on whether more or less rules need to be deleted per cycle. 
Note that this metric is only emitted when `druid.coordinator.kill.rule.on` is 
set to true.| |Varies|
 |`metadata/kill/datasource/count`|Total number of datasource metadata that 
were automatically deleted from metadata store per each Coordinator kill 
datasource duty run (Note: datasource metadata only exists for datasource 
created from supervisor). This metric can help adjust 
`druid.coordinator.kill.datasource.durationToRetain` configuration based on 
whether more or less datasource metadata need to be deleted per cycle. Note 
that this metric is only emitted when `druid.coordinator.kill.datasource.on` is 
set to true.| |Varies|
 |`init/serverview/time`|Time taken to initialize the coordinator server 
view.||Depends on the number of segments|
+|`segment/serverview/sync/healthy`|Sync status of the Broker with a 
segment-loading server such as a Historical or Peon. Emitted only when 
[HTTP-based server view](../configuration/index.md#segment-management) is 
enabled. This metric can be used in conjunction with 
`segment/serverview/sync/unstableTime` to debug slow startup of the 
Coordinator.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
+|`segment/serverview/sync/unstableTime`|Time in milliseconds for which the 
Broker has been failing to sync with a segment-loading server. Emitted only 
when [HTTP-based server view](../configuration/index.md#segment-management) is 
enabled.|`server`, `tier`|Not emitted for synced servers.|

Review Comment:
   ```suggestion
   |`segment/serverview/sync/healthy`|Sync status of the Coordinator with a 
segment-loading server such as a Historical or Peon. Emitted only when 
[HTTP-based server view](../configuration/index.md#segment-management) is 
enabled. This metric can be used in conjunction with 
`segment/serverview/sync/unstableTime` to debug slow startup of the 
Coordinator.|`server`, `tier`|1 for fully synced servers, 0 otherwise|
   |`segment/serverview/sync/unstableTime`|Time in milliseconds for which the 
Coordinator has been failing to sync with a segment-loading server. Emitted 
only when [HTTP-based server 
view](../configuration/index.md#segment-management) is enabled.|`server`, 
`tier`|Not emitted for synced servers.|
   ```



##########
server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java:
##########
@@ -396,35 +397,38 @@ private void addNextSyncToWorkQueue()
       }
       catch (Throwable th) {
         if (executor.isShutdown()) {
-          log.warn(
-              th,
-              "Couldn't schedule next sync. [%s] is not being synced any more, 
probably because executor is stopped.",
-              logIdentity
-          );
+          log.warn(th, "Could not schedule sync for server[%s] because 
executor is stopped.", logIdentity);
         } else {
           log.makeAlert(
               th,
-              "Couldn't schedule next sync. [%s] is not being synced any more, 
restarting Druid process on that "
-              + "server might fix the issue.",
+              "Could not schedule sync for server [%s]. Try restarting the 
Druid process on that server.",
               logIdentity
           ).emit();
         }
       }
     }
   }
 
-  private boolean incrementFailedAttemptAndCheckUnstabilityTimeout()
+  private void markServerUnstableAndAlert(Throwable throwable, String action)
   {
-    if (consecutiveFailedAttemptCount > 0
-        && (System.currentTimeMillis() - unstableStartTime) > 
serverUnstabilityTimeout) {
-      return true;
-    }
-
     if (consecutiveFailedAttemptCount++ == 0) {
-      unstableStartTime = System.currentTimeMillis();
+      sinceUnstable.restart();
     }
 
-    return false;
+    final long unstableSeconds = getUnstableTimeMillis() / 1000;
+    final String message = StringUtils.format(

Review Comment:
   anything an admin can do when they see this message? 



##########
server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHttpSyncer.java:
##########
@@ -144,69 +150,91 @@ public void stop()
   {
     synchronized (startStopLock) {
       if (!startStopLock.canStop()) {
-        throw new ISE("Can't stop ChangeRequestHttpSyncer[%s].", logIdentity);
+        throw new ISE("Could not stop sync for server[%s].", logIdentity);
       }
       try {
-        log.info("Stopping ChangeRequestHttpSyncer[%s].", logIdentity);
+        log.info("Stopping sync for server[%s].", logIdentity);
       }
       finally {
         startStopLock.exitStop();
       }
 
-      log.info("Stopped ChangeRequestHttpSyncer[%s].", logIdentity);
+      log.info("Stopped sync for server[%s].", logIdentity);
     }
   }
 
-  /** Wait for first fetch of segment listing from server. */
-  public boolean awaitInitialization(long timeout, TimeUnit timeUnit) throws 
InterruptedException
+  /**
+   * Waits for the first successful sync with this server up to {@link 
#maxDurationToWaitForSync}.
+   */
+  public boolean awaitInitialization() throws InterruptedException
   {
-    return initializationLatch.await(timeout, timeUnit);
+    return initializationLatch.await(maxDurationToWaitForSync.getMillis(), 
TimeUnit.MILLISECONDS);
   }
 
   /**
-   * This method returns the debugging information for printing, must not be 
used for any other purpose.
+   * Waits upto 1 millisecond for the first successful sync with this server.
    */
-  public Map<String, Object> getDebugInfo()
+  public boolean isInitialized() throws InterruptedException
   {
-    long currTime = System.currentTimeMillis();
+    return initializationLatch.await(1, TimeUnit.MILLISECONDS);
+  }
 
-    Object notSuccessfullySyncedFor;
-    if (lastSuccessfulSyncTime == 0) {
-      notSuccessfullySyncedFor = "Never Successfully Synced";
-    } else {
-      notSuccessfullySyncedFor = (currTime - lastSuccessfulSyncTime) / 1000;
-    }
+  /**
+   * Returns debugging information for printing, must not be used for any 
other purpose.
+   */
+  public Map<String, Object> getDebugInfo()
+  {
     return ImmutableMap.of(
-        "notSyncedForSecs", lastSyncTime == 0 ? "Never Synced" : (currTime - 
lastSyncTime) / 1000,
-        "notSuccessfullySyncedFor", notSuccessfullySyncedFor,
+        "millisSinceLastRequest", sinceLastSyncRequest.millisElapsed(),
+        "millisSinceLastSuccess", sinceLastSyncSuccess.millisElapsed(),
         "consecutiveFailedAttemptCount", consecutiveFailedAttemptCount,
         "syncScheduled", startStopLock.isStarted()
     );
   }
 
   /**
-   * Exposed for monitoring use to see if sync is working fine and not stopped 
due to any coding bugs. If this
-   * ever returns false then caller of this method must create an alert and it 
should be looked into for any
-   * bugs.
+   * Whether this syncer should be reset. This method returning true typically
+   * indicates a problem with the sync scheduler.
+   *
+   * @return true if the delay since the last request to the server (or since
+   * syncer start in case of no request to the server) has exceeded
+   * {@link #maxDelayBetweenSyncRequests}.
    */
-  public boolean isOK()
+  public boolean needsReset()
   {
-    return (System.currentTimeMillis() - lastSyncTime) < MAX_RETRY_BACKOFF + 3 
* serverHttpTimeout;
+    if (sinceLastSyncRequest.isRunning()) {
+      return sinceLastSyncRequest.hasElapsed(maxDelayBetweenSyncRequests);
+    } else {
+      return sinceSyncerStart.hasElapsed(maxDelayBetweenSyncRequests);
+    }
   }
 
-  public long getServerHttpTimeout()
+  public long getUnstableTimeMillis()
   {
-    return serverHttpTimeout;
+    return consecutiveFailedAttemptCount <= 0 ? 0 : 
sinceUnstable.millisElapsed();
+  }
+
+  /**
+   * @return true if there have been no sync failures recently and the last
+   * successful sync was not more than {@link #maxDurationToWaitForSync} ago.
+   */
+  public boolean isSyncedSuccessfully()
+  {
+    if (consecutiveFailedAttemptCount > 0) {
+      return false;
+    } else {
+      return sinceLastSyncSuccess.hasNotElapsed(maxDurationToWaitForSync);
+    }
   }
 
   private void sync()
   {
     if (!startStopLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
-      log.info("Skipping sync() call for server[%s].", logIdentity);
+      log.info("Skipping sync for server[%s] as lifecycle has not started.", 
logIdentity);

Review Comment:
   these error messages are also a bit cryptic. I wonder if we can make them 
more meaningful. like what does lifecycle even mean? 



##########
server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java:
##########
@@ -204,15 +214,26 @@ private DruidServer toDruidServer(DiscoveryDruidNode node)
             }
         );
 
-        scheduleSyncMonitoring();
+        ScheduledExecutors.scheduleAtFixedRate(
+            executor,
+            Duration.standardSeconds(60),
+            Duration.standardMinutes(5),
+            this::checkAndResetUnhealthyServers
+        );

Review Comment:
   this could run with fixed delay instead of fixed rate. 



##########
server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java:
##########
@@ -443,61 +454,57 @@ public Map<String, Object> getDebugInfo()
     return result;
   }
 
-  private void scheduleSyncMonitoring()
-  {
-    executor.scheduleAtFixedRate(
-        () -> {
-          log.debug("Running the Sync Monitoring.");
-
-          try {
-            syncMonitoring();
-          }
-          catch (Exception ex) {
-            if (ex instanceof InterruptedException) {
-              Thread.currentThread().interrupt();
-            } else {
-              log.makeAlert(ex, "Exception in sync monitoring.").emit();
-            }
-          }
-        },
-        1,
-        5,
-        TimeUnit.MINUTES
-    );
-  }
-
   @VisibleForTesting
-  void syncMonitoring()
+  void checkAndResetUnhealthyServers()
   {
     // Ensure that the collection is not being modified during iteration. 
Iterate over a copy
     final Set<Map.Entry<String, DruidServerHolder>> serverEntrySet = 
ImmutableSet.copyOf(servers.entrySet());
     for (Map.Entry<String, DruidServerHolder> e : serverEntrySet) {
       DruidServerHolder serverHolder = e.getValue();
-      if (!serverHolder.syncer.isOK()) {
+      if (serverHolder.syncer.needsReset()) {
         synchronized (servers) {
-          // check again that server is still there and only then reset.
+          // Reset only if the server is still present in the map
           if (servers.containsKey(e.getKey())) {
-            log.makeAlert(
-                "Server[%s] is not syncing properly. Current state is [%s]. 
Resetting it.",
+            log.warn(
+                "Resetting server[%s] with state[%s] as it is not syncing 
properly.",
                 serverHolder.druidServer.getName(),
                 serverHolder.syncer.getDebugInfo()
-            ).emit();
+            );
             serverRemoved(serverHolder.druidServer);
-            serverAdded(new DruidServer(
-                serverHolder.druidServer.getName(),
-                serverHolder.druidServer.getHostAndPort(),
-                serverHolder.druidServer.getHostAndTlsPort(),
-                serverHolder.druidServer.getMaxSize(),
-                serverHolder.druidServer.getType(),
-                serverHolder.druidServer.getTier(),
-                serverHolder.druidServer.getPriority()
-            ));
+            serverAdded(serverHolder.druidServer.copyWithoutSegments());
           }
         }
       }
     }
   }
 
+  private void emitServerStatusMetrics()
+  {
+    final ServiceMetricEvent.Builder eventBuilder = 
ServiceMetricEvent.builder();
+    try {
+      final Map<String, DruidServerHolder> serversCopy = 
ImmutableMap.copyOf(servers);
+      serversCopy.forEach((serverName, serverHolder) -> {
+        final DruidServer server = serverHolder.druidServer;
+        eventBuilder.setDimension("tier", server.getTier());
+        eventBuilder.setDimension("server", serverName);
+
+        final boolean isSynced = serverHolder.syncer.isSyncedSuccessfully();
+        serviceEmitter.emit(
+            eventBuilder.build("segment/serverview/sync/healthy", isSynced ? 1 
: 0)
+        );
+        final long unstableTimeMillis = 
serverHolder.syncer.getUnstableTimeMillis();
+        if (unstableTimeMillis > 0) {
+          serviceEmitter.emit(
+              eventBuilder.build("segment/serverview/sync/unstableTime", 
unstableTimeMillis)
+          );
+        }

Review Comment:
   How about emitting zero value as metric? An absent metric will make it a bit 
tricky to setup alert. 



##########
server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java:
##########
@@ -204,15 +214,26 @@ private DruidServer toDruidServer(DiscoveryDruidNode node)
             }
         );
 
-        scheduleSyncMonitoring();
+        ScheduledExecutors.scheduleAtFixedRate(
+            executor,
+            Duration.standardSeconds(60),
+            Duration.standardMinutes(5),
+            this::checkAndResetUnhealthyServers
+        );
+        ScheduledExecutors.scheduleAtFixedRate(
+            executor,
+            Duration.standardSeconds(30),
+            Duration.standardMinutes(1),
+            this::emitServerStatusMetrics

Review Comment:
   I just realized that one benefit of having a separate executor is that 
metrics get emitted even if thread pool is busy for other reasons. Thought 
could be done later. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to