This is an automated email from the ASF dual-hosted git repository.

sunithabeeram pushed a commit to branch fixReload
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 4e433dbc611e8bfc243185990091bfcf8599bf2d
Author: Sunitha Beeram <[email protected]>
AuthorDate: Tue Dec 11 10:52:13 2018 -0800

    [PINOT-7476] Add metrics to track cases where segment refresh/reloads fail
---
 .../linkedin/pinot/common/metrics/ServerMeter.java |  4 +++-
 .../core/segment/index/loader/LoaderUtils.java     |  2 +-
 .../server/starter/helix/HelixServerStarter.java   |  4 ++--
 .../helix/SegmentMessageHandlerFactory.java        | 25 +++++++++++++++++-----
 4 files changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java
index 4cda98d..44193fd 100644
--- 
a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java
@@ -54,7 +54,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   NUM_SEGMENTS_QUERIED("numSegmentsQueried", false),
   NUM_SEGMENTS_PROCESSED("numSegmentsProcessed", false),
   NUM_SEGMENTS_MATCHED("numSegmentsMatched", false),
-  NUM_MISSING_SEGMENTS("segments", false);
+  NUM_MISSING_SEGMENTS("segments", false),
+  RELOAD_FAILURES("segments", false),
+  REFRESH_FAILURES("segments", false);
 
   private final String meterName;
   private final String unit;
diff --git 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java
 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java
index b8c30e4..d69130b 100644
--- 
a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java
+++ 
b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java
@@ -82,7 +82,7 @@ public class LoaderUtils {
    * Try to recover a segment from reload failures (reloadSegment() method in 
HelixInstanceDataManager). This has no
    * effect for normal segments.
    * <p>Reload failures include normal failures like Java exceptions (called 
in reloadSegment() finally block) and hard
-   * failures such as server restart during reload and JVM crush (called 
before trying to load segment from the index
+   * failures such as server restart during reload and JVM crash (called 
before trying to load segment from the index
    * directory).
    * <p>The following failure scenarios could happen (use atomic renaming 
operation to classify scenarios):
    * <ul>
diff --git 
a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java
 
b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java
index f63725e..3cb99bd 100644
--- 
a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java
+++ 
b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java
@@ -163,13 +163,13 @@ public class HelixServerStarter {
     _adminApiApplication.start(adminApiPort);
     setAdminApiPort(adminApiPort);
 
+    final ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     // Register message handler factory
     SegmentMessageHandlerFactory messageHandlerFactory =
-        new SegmentMessageHandlerFactory(fetcherAndLoader, 
_serverInstance.getInstanceDataManager());
+        new SegmentMessageHandlerFactory(fetcherAndLoader, 
_serverInstance.getInstanceDataManager(), serverMetrics);
     _helixManager.getMessagingService()
         
.registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), 
messageHandlerFactory);
 
-    final ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
     serverMetrics.addCallbackGauge("helix.connected", () -> 
_helixManager.isConnected() ? 1L : 0L);
     _helixManager.addPreConnectCallback(
         () -> 
serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 
1L));
diff --git 
a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
 
b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
index ddaf778..73df7ad 100644
--- 
a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
+++ 
b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java
@@ -17,6 +17,8 @@ package com.linkedin.pinot.server.starter.helix;
 
 import com.linkedin.pinot.common.messages.SegmentRefreshMessage;
 import com.linkedin.pinot.common.messages.SegmentReloadMessage;
+import com.linkedin.pinot.common.metrics.ServerMeter;
+import com.linkedin.pinot.common.metrics.ServerMetrics;
 import com.linkedin.pinot.core.data.manager.InstanceDataManager;
 import java.util.concurrent.Semaphore;
 import org.apache.helix.NotificationContext;
@@ -37,11 +39,13 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
 
   private final SegmentFetcherAndLoader _fetcherAndLoader;
   private final InstanceDataManager _instanceDataManager;
+  private final ServerMetrics _metrics;
 
   public SegmentMessageHandlerFactory(SegmentFetcherAndLoader fetcherAndLoader,
-      InstanceDataManager instanceDataManager) {
+      InstanceDataManager instanceDataManager, ServerMetrics metrics) {
     _fetcherAndLoader = fetcherAndLoader;
     _instanceDataManager = instanceDataManager;
+    _metrics = metrics;
     int maxParallelRefreshThreads = 
instanceDataManager.getMaxParallelRefreshThreads();
     if (maxParallelRefreshThreads > 0) {
       _refreshThreadSemaphore = new Semaphore(maxParallelRefreshThreads, true);
@@ -75,9 +79,9 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
     String msgSubType = message.getMsgSubType();
     switch (msgSubType) {
       case SegmentRefreshMessage.REFRESH_SEGMENT_MSG_SUB_TYPE:
-        return new SegmentRefreshMessageHandler(new 
SegmentRefreshMessage(message), context);
+        return new SegmentRefreshMessageHandler(new 
SegmentRefreshMessage(message), _metrics, context);
       case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE:
-        return new SegmentReloadMessageHandler(new 
SegmentReloadMessage(message), context);
+        return new SegmentReloadMessageHandler(new 
SegmentReloadMessage(message), _metrics, context);
       default:
         throw new UnsupportedOperationException("Unsupported user defined 
message sub type: " + msgSubType);
     }
@@ -97,12 +101,15 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
   private class SegmentRefreshMessageHandler extends MessageHandler {
     private final String _segmentName;
     private final String _tableNameWithType;
+    private final ServerMetrics _metrics;
     private final Logger _logger;
 
-    public SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage, 
NotificationContext context) {
+    public SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage, 
ServerMetrics metrics,
+        NotificationContext context) {
       super(refreshMessage, context);
       _segmentName = refreshMessage.getPartitionName();
       _tableNameWithType = refreshMessage.getResourceName();
+      _metrics = metrics;
       _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + 
SegmentRefreshMessageHandler.class);
     }
 
@@ -115,6 +122,8 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
         // The number of retry times depends on the retry count in 
SegmentOperations.
         _fetcherAndLoader.addOrReplaceOfflineSegment(_tableNameWithType, 
_segmentName);
         result.setSuccess(true);
+      } catch (Exception e) {
+        _metrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REFRESH_FAILURES, 1);
       } finally {
         releaseSema();
       }
@@ -130,12 +139,15 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
   private class SegmentReloadMessageHandler extends MessageHandler {
     private final String _segmentName;
     private final String _tableNameWithType;
+    private final ServerMetrics _metrics;
     private final Logger _logger;
 
-    public SegmentReloadMessageHandler(SegmentReloadMessage 
segmentReloadMessage, NotificationContext context) {
+    public SegmentReloadMessageHandler(SegmentReloadMessage 
segmentReloadMessage, ServerMetrics metrics,
+        NotificationContext context) {
       super(segmentReloadMessage, context);
       _segmentName = segmentReloadMessage.getPartitionName();
       _tableNameWithType = segmentReloadMessage.getResourceName();
+      _metrics = metrics;
       _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + 
SegmentReloadMessageHandler.class);
     }
 
@@ -146,6 +158,8 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
       try {
         if (_segmentName.equals("")) {
           acquireSema("ALL", _logger);
+          // NOTE: the method aborts if any segment reload encounters an 
unhandled exception - can lead to inconsisten
+          // state across segments
           _instanceDataManager.reloadAllSegments(_tableNameWithType);
         } else {
           // Reload one segment
@@ -154,6 +168,7 @@ public class SegmentMessageHandlerFactory implements 
MessageHandlerFactory {
         }
         helixTaskResult.setSuccess(true);
       } catch (Throwable e) {
+        _metrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.RELOAD_FAILURES, 1);
         // catch all Errors and Exceptions: if we only catch Exception, Errors 
go completely unhandled
         // (without any corresponding logs to indicate failure!) in the 
callable path
         throw new RuntimeException(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to