This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch add-logic-for-lead-controller-resource in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit e87ee3966a57cb6943620f78a2b518961a09656a Author: jackjlli <[email protected]> AuthorDate: Mon Jun 10 22:30:24 2019 -0700 Add logic for leveraging lead controller resource --- .../realtime/LLRealtimeSegmentDataManager.java | 4 +- .../manager/realtime/RealtimeTableDataManager.java | 2 +- .../realtime/SegmentBuildTimeLeaseExtender.java | 8 +- .../server/realtime/ControllerLeaderLocator.java | 106 ++++++++++++++++++--- .../ServerSegmentCompletionProtocolHandler.java | 7 +- .../realtime/LLRealtimeSegmentDataManagerTest.java | 2 +- .../realtime/ControllerLeaderLocatorTest.java | 19 +++- .../tests/SegmentCompletionIntegrationTests.java | 2 +- 8 files changed, 120 insertions(+), 30 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 59d2bb8..2e2aced 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1028,6 +1028,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore(); _segmentZKMetadata = (LLCRealtimeSegmentZKMetadata) segmentZKMetadata; _tableConfig = tableConfig; + _tableNameWithType = _tableConfig.getTableName(); _realtimeTableDataManager = realtimeTableDataManager; _resourceDataDir = resourceDataDir; _indexLoadingConfig = indexLoadingConfig; @@ -1036,7 +1037,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentVersion = indexLoadingConfig.getSegmentVersion(); _instanceId = _realtimeTableDataManager.getServerInstance(); _leaseExtender = SegmentBuildTimeLeaseExtender.getLeaseExtender(_instanceId); - _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics); + _protocolHandler = new ServerSegmentCompletionProtocolHandler(_serverMetrics, _tableNameWithType); // TODO Validate configs IndexingConfig indexingConfig = _tableConfig.getIndexingConfig(); @@ -1046,7 +1047,6 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _segmentNameStr = _segmentZKMetadata.getSegmentName(); _segmentName = new LLCSegmentName(_segmentNameStr); _streamPartitionId = _segmentName.getPartitionId(); - _tableNameWithType = _tableConfig.getTableName(); _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName(); _metricKeyName = _tableNameWithType + "-" + _streamTopic + "-" + _streamPartitionId; segmentLogger = LoggerFactory.getLogger(LLRealtimeSegmentDataManager.class.getName() + "_" + _segmentNameStr); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index cde73a4..8b81dc6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -89,7 +89,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { @Override protected void doInit() { - _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics); + _leaseExtender = SegmentBuildTimeLeaseExtender.create(_instanceId, _serverMetrics, _tableNameWithType); File statsFile = new File(_tableDataDir, STATS_FILE_NAME); try { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java index 6676619..fcd6422 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentBuildTimeLeaseExtender.java @@ -57,20 +57,20 @@ public class SegmentBuildTimeLeaseExtender { } public static synchronized SegmentBuildTimeLeaseExtender create(final String instanceId, - ServerMetrics serverMetrics) { + ServerMetrics serverMetrics, String tableNameWithType) { SegmentBuildTimeLeaseExtender leaseExtender = INSTANCE_TO_LEASE_EXTENDER.get(instanceId); if (leaseExtender != null) { LOGGER.warn("Instance already exists"); } else { - leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics); + leaseExtender = new SegmentBuildTimeLeaseExtender(instanceId, serverMetrics, tableNameWithType); INSTANCE_TO_LEASE_EXTENDER.put(instanceId, leaseExtender); } return leaseExtender; } - private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics) { + private SegmentBuildTimeLeaseExtender(String instanceId, ServerMetrics serverMetrics, String tableNameWithType) { _instanceId = instanceId; - _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics); + _protocolHandler = new ServerSegmentCompletionProtocolHandler(serverMetrics, tableNameWithType); _executor = new ScheduledThreadPoolExecutor(1); } diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java index 8a3b0fb..07b0087 100644 --- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java +++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ControllerLeaderLocator.java @@ -19,15 +19,20 @@ package org.apache.pinot.server.realtime; import com.google.common.annotations.VisibleForTesting; +import java.util.Map; +import java.util.Set; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; +import org.apache.helix.model.ExternalView; import org.apache.pinot.core.query.utils.Pair; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME; + // Helix keeps the old controller around for 30s before electing a new one, so we will keep getting // the old controller as leader, and it will keep returning NOT_LEADER. @@ -78,35 +83,106 @@ public class ControllerLeaderLocator { /** * Locate the controller leader so that we can send LLC segment completion requests to it. * Checks the {@link ControllerLeaderLocator::_cachedControllerLeaderInvalid} flag and fetches the leader from helix if cached value is invalid - * + * @param rawTableName table name without type. * @return The host:port string of the current controller leader. */ - public synchronized Pair<String, Integer> getControllerLeader() { + public synchronized Pair<String, Integer> getControllerLeader(String rawTableName) { if (!_cachedControllerLeaderInvalid) { return _controllerLeaderHostPort; } + String leaderForTable = getLeaderForTable(rawTableName); + if (leaderForTable == null) { + LOGGER.warn("Failed to find a leader for Table: {}", rawTableName); + _cachedControllerLeaderInvalid = true; + return null; + } else { + _controllerLeaderHostPort = generateControllerLeaderHostPortPair(leaderForTable); + _cachedControllerLeaderInvalid = false; + LOGGER.info("Setting controller leader to be {}:{}", _controllerLeaderHostPort.getFirst(), + _controllerLeaderHostPort.getSecond()); + return _controllerLeaderHostPort; + } + } + + /** + * If partition leader exists, use this as the leader for realtime segment completion. + * Otherwise, try to use Helix leader. + * @param rawTableName table name without type + * @return the leader for this table. + */ + private String getLeaderForTable(String rawTableName) { + String leaderForTable; + ExternalView leadControllerResourceExternalView = _helixManager.getClusterManagmentTool().getResourceExternalView(_clusterName, LEAD_CONTROLLER_RESOURCE_NAME); + String partitionLeader = getPartitionLeader(leadControllerResourceExternalView, rawTableName); + if (partitionLeader != null) { + leaderForTable = partitionLeader; + } else { + // Get Helix leader to be the leader to this table. + String helixLeader = getHelixClusterLeader(); + if (helixLeader != null) { + leaderForTable = helixLeader; + } else { + leaderForTable = null; + } + } + return leaderForTable; + } + + /** + * Gets partition leader from lead controller resource. + * If the resource is disabled or no controller registered as participant, there is no instance in "MASTER" state. + * + * @param leadControllerResourceExternalView external view of lead controller resource + * @param rawTableName table name without type + * @return leader of partition, null if not found. + */ + private String getPartitionLeader(ExternalView leadControllerResourceExternalView, String rawTableName) { + if (leadControllerResourceExternalView == null) { + return null; + } + Set<String> partitionSet = leadControllerResourceExternalView.getPartitionSet(); + if (partitionSet == null || partitionSet.isEmpty()) { + return null; + } + int numPartitions = partitionSet.size(); + int partitionIndex = rawTableName.hashCode() % numPartitions; + String partitionName = LEAD_CONTROLLER_RESOURCE_NAME + "_" + partitionIndex; + Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partitionName); + + for (Map.Entry<String, String> entry : stateMap.entrySet()) { + if ("MASTER".equals(entry.getValue())) { + return entry.getKey(); + } + } + return null; + } + + /** + * Gets Helix leader in the cluster. Null if there is no leader. + * @return Helix leader. + */ + private String getHelixClusterLeader() { BaseDataAccessor<ZNRecord> dataAccessor = _helixManager.getHelixDataAccessor().getBaseDataAccessor(); Stat stat = new Stat(); try { - ZNRecord znRecord = - dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST); - String leader = znRecord.getId(); - int index = leader.lastIndexOf('_'); - String leaderHost = leader.substring(0, index); - int leaderPort = Integer.valueOf(leader.substring(index + 1)); - _controllerLeaderHostPort = new Pair<>(leaderHost, leaderPort); - _cachedControllerLeaderInvalid = false; - LOGGER.info("Setting controller leader to be {}:{} as per znode version {}, mtime {}", leaderHost, leaderPort, - stat.getVersion(), stat.getMtime()); - return _controllerLeaderHostPort; + ZNRecord znRecord = dataAccessor.get("/" + _clusterName + "/CONTROLLER/LEADER", stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST); + String helixLeader = znRecord.getId(); + LOGGER.info("Getting Helix leader: {} as per znode version {}, mtime {}", helixLeader, stat.getVersion(), stat.getMtime()); + return helixLeader; } catch (Exception e) { - LOGGER.warn("Could not locate controller leader, exception", e); - _cachedControllerLeaderInvalid = true; + LOGGER.warn("Could not locate Helix leader", e); return null; } } + private Pair<String, Integer> generateControllerLeaderHostPortPair(String controllerLeaderId) { + int index = controllerLeaderId.lastIndexOf('_'); + String leaderHost = controllerLeaderId.substring(0, index); + int leaderPort = Integer.valueOf(controllerLeaderId.substring(index + 1)); + return new Pair<>(leaderHost, leaderPort); + } + /** * Invalidates the cached controller leader value by setting the {@link ControllerLeaderLocator::_cacheControllerLeadeInvalid} flag. * This flag is always checked first by {@link ControllerLeaderLocator::getControllerLeader()} method before returning the leader. If set, leader is fetched from helix, else cached leader value is returned. diff --git a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java index bcfa7ee..5c75f1e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java @@ -23,6 +23,7 @@ import java.net.URI; import java.util.Map; import javax.net.ssl.SSLContext; import org.apache.commons.configuration.Configuration; +import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; @@ -51,6 +52,7 @@ public class ServerSegmentCompletionProtocolHandler { private final FileUploadDownloadClient _fileUploadDownloadClient; private final ServerMetrics _serverMetrics; + private final String _rawTableName; public static void init(Configuration uploaderConfig) { Configuration httpsConfig = uploaderConfig.subset(HTTPS_PROTOCOL); @@ -62,9 +64,10 @@ public class ServerSegmentCompletionProtocolHandler { uploaderConfig.getInt(CONFIG_OF_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS, DEFAULT_SEGMENT_UPLOAD_REQUEST_TIMEOUT_MS); } - public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics) { + public ServerSegmentCompletionProtocolHandler(ServerMetrics serverMetrics, String tableNameWithType) { _fileUploadDownloadClient = new FileUploadDownloadClient(_sslContext); _serverMetrics = serverMetrics; + _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); } public SegmentCompletionProtocol.Response segmentCommitStart(SegmentCompletionProtocol.Request.Params params) { @@ -159,7 +162,7 @@ public class ServerSegmentCompletionProtocolHandler { private String createSegmentCompletionUrl(SegmentCompletionProtocol.Request request) { ControllerLeaderLocator leaderLocator = ControllerLeaderLocator.getInstance(); - final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader(); + final Pair<String, Integer> leaderHostPort = leaderLocator.getControllerLeader(_rawTableName); if (leaderHostPort == null) { LOGGER.warn("No leader found while trying to send {}", request.toString()); return null; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index 3d7bfce..8ff15b8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -111,7 +111,7 @@ public class LLRealtimeSegmentDataManagerTest { private RealtimeTableDataManager createTableDataManager() { final String instanceId = "server-1"; - SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry())); + SegmentBuildTimeLeaseExtender.create(instanceId, new ServerMetrics(new MetricsRegistry()), _tableName); RealtimeTableDataManager tableDataManager = mock(RealtimeTableDataManager.class); when(tableDataManager.getServerInstance()).thenReturn(instanceId); RealtimeSegmentStatsHistory statsHistory = mock(RealtimeSegmentStatsHistory.class); diff --git a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java index 34e955d..5f0cdbc 100644 --- a/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/server/realtime/ControllerLeaderLocatorTest.java @@ -19,6 +19,7 @@ package org.apache.pinot.server.realtime; import org.apache.helix.BaseDataAccessor; +import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; @@ -35,6 +36,7 @@ import static org.mockito.Mockito.when; public class ControllerLeaderLocatorTest { + private String testTable = "testTable"; /** * Tests the invalidate logic for cached controller leader @@ -45,6 +47,7 @@ public class ControllerLeaderLocatorTest { HelixManager helixManager = mock(HelixManager.class); HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class); + HelixAdmin helixAdmin = mock(HelixAdmin.class); ZNRecord znRecord = mock(ZNRecord.class); final String leaderHost = "host"; final int leaderPort = 12345; @@ -54,6 +57,8 @@ public class ControllerLeaderLocatorTest { when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort); when(baseDataAccessor.get(anyString(), any(), anyInt())).thenReturn(znRecord); when(helixManager.getClusterName()).thenReturn("testCluster"); + when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin); + when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null); // Create Controller Leader Locator FakeControllerLeaderLocator.create(helixManager); @@ -78,7 +83,7 @@ public class ControllerLeaderLocatorTest { Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis); // getControllerLeader, which validates the cache - controllerLeaderLocator.getControllerLeader(); + controllerLeaderLocator.getControllerLeader(testTable); Assert.assertFalse(controllerLeaderLocator.isCachedControllerLeaderInvalid()); Assert.assertEquals(controllerLeaderLocator.getLastCacheInvalidateMillis(), lastCacheInvalidateMillis); @@ -104,16 +109,19 @@ public class ControllerLeaderLocatorTest { HelixManager helixManager = mock(HelixManager.class); HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class); + HelixAdmin helixAdmin = mock(HelixAdmin.class); when(helixManager.getHelixDataAccessor()).thenReturn(helixDataAccessor); when(helixDataAccessor.getBaseDataAccessor()).thenReturn(baseDataAccessor); when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenThrow(new RuntimeException()); + when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin); + when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null); // Create Controller Leader Locator FakeControllerLeaderLocator.create(helixManager); ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance(); - Assert.assertEquals(controllerLeaderLocator.getControllerLeader(), null); + Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable), null); } @Test @@ -121,6 +129,7 @@ public class ControllerLeaderLocatorTest { HelixManager helixManager = mock(HelixManager.class); HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class); BaseDataAccessor<ZNRecord> baseDataAccessor = mock(BaseDataAccessor.class); + HelixAdmin helixAdmin = mock(HelixAdmin.class); ZNRecord znRecord = mock(ZNRecord.class); final String leaderHost = "host"; final int leaderPort = 12345; @@ -130,14 +139,16 @@ public class ControllerLeaderLocatorTest { when(znRecord.getId()).thenReturn(leaderHost + "_" + leaderPort); when(baseDataAccessor.get(anyString(), (Stat) any(), anyInt())).thenReturn(znRecord); when(helixManager.getClusterName()).thenReturn("myCluster"); + when(helixManager.getClusterManagmentTool()).thenReturn(helixAdmin); + when(helixAdmin.getResourceExternalView(anyString(), anyString())).thenReturn(null); // Create Controller Leader Locator FakeControllerLeaderLocator.create(helixManager); ControllerLeaderLocator controllerLeaderLocator = FakeControllerLeaderLocator.getInstance(); Pair<String, Integer> expectedLeaderLocation = new Pair<>(leaderHost, leaderPort); - Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getFirst(), expectedLeaderLocation.getFirst()); - Assert.assertEquals(controllerLeaderLocator.getControllerLeader().getSecond(), expectedLeaderLocation.getSecond()); + Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getFirst(), expectedLeaderLocation.getFirst()); + Assert.assertEquals(controllerLeaderLocator.getControllerLeader(testTable).getSecond(), expectedLeaderLocation.getSecond()); } static class FakeControllerLeaderLocator extends ControllerLeaderLocator { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java index 7e7b041..f6cd400 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTests.java @@ -134,7 +134,7 @@ public class SegmentCompletionIntegrationTests extends LLCRealtimeClusterIntegra // Now report to the controller that we had to stop consumption ServerSegmentCompletionProtocolHandler protocolHandler = - new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry())); + new ServerSegmentCompletionProtocolHandler(new ServerMetrics(new MetricsRegistry()), realtimeTableName); SegmentCompletionProtocol.Request.Params params = new SegmentCompletionProtocol.Request.Params(); params.withOffset(45688L).withSegmentName(_currentSegment).withReason("RandomReason") .withInstanceId(_serverInstance); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
