This is an automated email from the ASF dual-hosted git repository. sunithabeeram pushed a commit to branch fixReload in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4e433dbc611e8bfc243185990091bfcf8599bf2d Author: Sunitha Beeram <[email protected]> AuthorDate: Tue Dec 11 10:52:13 2018 -0800 [PINOT-7476] Add metrics to track cases where segment refresh/reloads fail --- .../linkedin/pinot/common/metrics/ServerMeter.java | 4 +++- .../core/segment/index/loader/LoaderUtils.java | 2 +- .../server/starter/helix/HelixServerStarter.java | 4 ++-- .../helix/SegmentMessageHandlerFactory.java | 25 +++++++++++++++++----- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java index 4cda98d..44193fd 100644 --- a/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/com/linkedin/pinot/common/metrics/ServerMeter.java @@ -54,7 +54,9 @@ public enum ServerMeter implements AbstractMetrics.Meter { NUM_SEGMENTS_QUERIED("numSegmentsQueried", false), NUM_SEGMENTS_PROCESSED("numSegmentsProcessed", false), NUM_SEGMENTS_MATCHED("numSegmentsMatched", false), - NUM_MISSING_SEGMENTS("segments", false); + NUM_MISSING_SEGMENTS("segments", false), + RELOAD_FAILURES("segments", false), + REFRESH_FAILURES("segments", false); private final String meterName; private final String unit; diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java index b8c30e4..d69130b 100644 --- a/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java +++ b/pinot-core/src/main/java/com/linkedin/pinot/core/segment/index/loader/LoaderUtils.java @@ -82,7 +82,7 @@ public class LoaderUtils { * Try to recover a segment from reload failures (reloadSegment() method in HelixInstanceDataManager). This has no * effect for normal segments. * <p>Reload failures include normal failures like Java exceptions (called in reloadSegment() finally block) and hard - * failures such as server restart during reload and JVM crush (called before trying to load segment from the index + * failures such as server restart during reload and JVM crash (called before trying to load segment from the index * directory). * <p>The following failure scenarios could happen (use atomic renaming operation to classify scenarios): * <ul> diff --git a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java index f63725e..3cb99bd 100644 --- a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java +++ b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/HelixServerStarter.java @@ -163,13 +163,13 @@ public class HelixServerStarter { _adminApiApplication.start(adminApiPort); setAdminApiPort(adminApiPort); + final ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); // Register message handler factory SegmentMessageHandlerFactory messageHandlerFactory = - new SegmentMessageHandlerFactory(fetcherAndLoader, _serverInstance.getInstanceDataManager()); + new SegmentMessageHandlerFactory(fetcherAndLoader, _serverInstance.getInstanceDataManager(), serverMetrics); _helixManager.getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), messageHandlerFactory); - final ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); serverMetrics.addCallbackGauge("helix.connected", () -> _helixManager.isConnected() ? 1L : 0L); _helixManager.addPreConnectCallback( () -> serverMetrics.addMeteredGlobalValue(ServerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); diff --git a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java index ddaf778..73df7ad 100644 --- a/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java +++ b/pinot-server/src/main/java/com/linkedin/pinot/server/starter/helix/SegmentMessageHandlerFactory.java @@ -17,6 +17,8 @@ package com.linkedin.pinot.server.starter.helix; import com.linkedin.pinot.common.messages.SegmentRefreshMessage; import com.linkedin.pinot.common.messages.SegmentReloadMessage; +import com.linkedin.pinot.common.metrics.ServerMeter; +import com.linkedin.pinot.common.metrics.ServerMetrics; import com.linkedin.pinot.core.data.manager.InstanceDataManager; import java.util.concurrent.Semaphore; import org.apache.helix.NotificationContext; @@ -37,11 +39,13 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { private final SegmentFetcherAndLoader _fetcherAndLoader; private final InstanceDataManager _instanceDataManager; + private final ServerMetrics _metrics; public SegmentMessageHandlerFactory(SegmentFetcherAndLoader fetcherAndLoader, - InstanceDataManager instanceDataManager) { + InstanceDataManager instanceDataManager, ServerMetrics metrics) { _fetcherAndLoader = fetcherAndLoader; _instanceDataManager = instanceDataManager; + _metrics = metrics; int maxParallelRefreshThreads = instanceDataManager.getMaxParallelRefreshThreads(); if (maxParallelRefreshThreads > 0) { _refreshThreadSemaphore = new Semaphore(maxParallelRefreshThreads, true); @@ -75,9 +79,9 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { String msgSubType = message.getMsgSubType(); switch (msgSubType) { case SegmentRefreshMessage.REFRESH_SEGMENT_MSG_SUB_TYPE: - return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), context); + return new SegmentRefreshMessageHandler(new SegmentRefreshMessage(message), _metrics, context); case SegmentReloadMessage.RELOAD_SEGMENT_MSG_SUB_TYPE: - return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), context); + return new SegmentReloadMessageHandler(new SegmentReloadMessage(message), _metrics, context); default: throw new UnsupportedOperationException("Unsupported user defined message sub type: " + msgSubType); } @@ -97,12 +101,15 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { private class SegmentRefreshMessageHandler extends MessageHandler { private final String _segmentName; private final String _tableNameWithType; + private final ServerMetrics _metrics; private final Logger _logger; - public SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage, NotificationContext context) { + public SegmentRefreshMessageHandler(SegmentRefreshMessage refreshMessage, ServerMetrics metrics, + NotificationContext context) { super(refreshMessage, context); _segmentName = refreshMessage.getPartitionName(); _tableNameWithType = refreshMessage.getResourceName(); + _metrics = metrics; _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + SegmentRefreshMessageHandler.class); } @@ -115,6 +122,8 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { // The number of retry times depends on the retry count in SegmentOperations. _fetcherAndLoader.addOrReplaceOfflineSegment(_tableNameWithType, _segmentName); result.setSuccess(true); + } catch (Exception e) { + _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REFRESH_FAILURES, 1); } finally { releaseSema(); } @@ -130,12 +139,15 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { private class SegmentReloadMessageHandler extends MessageHandler { private final String _segmentName; private final String _tableNameWithType; + private final ServerMetrics _metrics; private final Logger _logger; - public SegmentReloadMessageHandler(SegmentReloadMessage segmentReloadMessage, NotificationContext context) { + public SegmentReloadMessageHandler(SegmentReloadMessage segmentReloadMessage, ServerMetrics metrics, + NotificationContext context) { super(segmentReloadMessage, context); _segmentName = segmentReloadMessage.getPartitionName(); _tableNameWithType = segmentReloadMessage.getResourceName(); + _metrics = metrics; _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + SegmentReloadMessageHandler.class); } @@ -146,6 +158,8 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { try { if (_segmentName.equals("")) { acquireSema("ALL", _logger); + // NOTE: the method aborts if any segment reload encounters an unhandled exception - can lead to inconsisten + // state across segments _instanceDataManager.reloadAllSegments(_tableNameWithType); } else { // Reload one segment @@ -154,6 +168,7 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { } helixTaskResult.setSuccess(true); } catch (Throwable e) { + _metrics.addMeteredTableValue(_tableNameWithType, ServerMeter.RELOAD_FAILURES, 1); // catch all Errors and Exceptions: if we only catch Exception, Errors go completely unhandled // (without any corresponding logs to indicate failure!) in the callable path throw new RuntimeException( --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
