This is an automated email from the ASF dual-hosted git repository. jamesshao pushed a commit to branch upsert-refactor in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 63e52fe517621accc74a99d5a24e6246c8fc8cb0 Author: james Shao <[email protected]> AuthorDate: Tue Mar 17 17:46:31 2020 -0700 basic split of logics --- .../pinot/broker/broker/BrokerServerBuilder.java | 12 +- .../broker/broker/helix/HelixBrokerStarter.java | 35 +++- .../requesthandler/BaseBrokerRequestHandler.java | 47 ++++- .../requesthandler/LowWaterMarkQueryWriter.java | 151 ++++++++++++++ .../SingleConnectionBrokerRequestHandler.java | 8 +- .../broker/upsert/DefaultLowWaterMarkService.java | 28 ++- .../broker/upsert/LowWaterMarkServiceProvider.java | 39 ++-- .../LowWaterMarkQueryWriterTest.java | 130 ++++++++++++ .../apache/pinot/common/utils/CommonConstants.java | 2 + .../helix/core/PinotHelixResourceManager.java | 12 ++ ...rkManager.java => DefaultWaterMarkManager.java} | 15 +- ...ermarkManager.java => LowWaterMarkService.java} | 21 +- .../segment/updater}/SegmentDeletionHandler.java | 3 +- .../segment/updater}/UpsertComponentContainer.java | 19 +- ...WatermarkManager.java => WaterMarkManager.java} | 3 +- pinot-grigio/pinot-grigio-provided/pom.xml | 38 +++- .../upsert/PollingBasedLowWaterMarkService.java | 224 +++++++++++++++++++++ .../UpsertImmutableIndexSegmentCallback.java | 8 +- .../upsert/UpsertMutableIndexSegmentCallback.java | 6 +- .../pinot/core/segment/updater/SegmentUpdater.java | 4 +- .../updater/UpsertComponentContainerImpl.java | 151 ++++++++++++++ ...arkManager.java => UpsertWaterMarkManager.java} | 14 +- .../PollingBasedLowWaterMarkServiceTest.java | 215 ++++++++++++++++++++ .../UpsertImmutableIndexSegmentCallbackTest.java | 6 +- .../tests/ClusterIntegrationTestUtils.java | 2 + .../api/resources/LowWatermarksResource.java | 62 ++++++ .../server/api/resources/UpsertDebugResource.java | 84 ++++++++ .../org/apache/pinot/server/conf/ServerConf.java | 15 +- .../pinot/server/starter/ServerInstance.java | 39 +++- .../starter/helix/HelixInstanceDataManager.java | 7 - .../server/starter/helix/HelixServerStarter.java | 67 ++++-- .../SegmentOnlineOfflineStateModelFactory.java | 14 +- .../upsert/DefaultUpsertComponentContainer.java | 36 +++- .../upsert/UpsertComponentContainerProvider.java | 18 +- .../api/resources/LowWatermarksResourceTest.java | 23 ++- .../realtime/provisioning/MemoryEstimator.java | 17 +- 36 files changed, 1424 insertions(+), 151 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java index 103d469..c30dd51 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerServerBuilder.java @@ -19,8 +19,8 @@ package org.apache.pinot.broker.broker; import com.google.common.base.Preconditions; +import org.apache.pinot.core.segment.updater.LowWaterMarkService; import com.yammer.metrics.core.MetricsRegistry; -import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.configuration.Configuration; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -36,6 +36,8 @@ import org.apache.pinot.common.utils.CommonConstants.Helix; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.atomic.AtomicReference; + public class BrokerServerBuilder { private static final Logger LOGGER = LoggerFactory.getLogger(BrokerServerBuilder.class); @@ -57,9 +59,11 @@ public class BrokerServerBuilder { private final BrokerMetrics _brokerMetrics; private final BrokerRequestHandler _brokerRequestHandler; private final BrokerAdminApiApplication _brokerAdminApplication; + private final LowWaterMarkService _lwmService; public BrokerServerBuilder(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService, - QueryQuotaManager queryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore) { + QueryQuotaManager queryQuotaManager, ZkHelixPropertyStore<ZNRecord> propertyStore, + LowWaterMarkService lowWaterMarkService) { _config = config; _delayedShutdownTimeMs = config.getLong(Broker.CONFIG_OF_DELAY_SHUTDOWN_TIME_MS, Broker.DEFAULT_DELAY_SHUTDOWN_TIME_MS); @@ -77,8 +81,10 @@ public class BrokerServerBuilder { _brokerMetrics.initializeGlobalMeters(); _brokerRequestHandler = new SingleConnectionBrokerRequestHandler(_config, _routingTable, _timeBoundaryService, _accessControlFactory, - queryQuotaManager, _brokerMetrics, _propertyStore); + queryQuotaManager, _brokerMetrics, _propertyStore, lowWaterMarkService); _brokerAdminApplication = new BrokerAdminApiApplication(this); + + _lwmService = lowWaterMarkService; } public void start() { diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java index 6986182..0e127ad 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/HelixBrokerStarter.java @@ -20,11 +20,6 @@ package org.apache.pinot.broker.broker.helix; import com.google.common.collect.ImmutableList; import com.yammer.metrics.core.MetricsRegistry; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.helix.ConfigAccessor; @@ -49,6 +44,8 @@ import org.apache.pinot.broker.broker.BrokerServerBuilder; import org.apache.pinot.broker.queryquota.HelixExternalViewBasedQueryQuotaManager; import org.apache.pinot.broker.requesthandler.BrokerRequestHandler; import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting; +import org.apache.pinot.core.segment.updater.LowWaterMarkService; +import org.apache.pinot.broker.upsert.LowWaterMarkServiceProvider; import org.apache.pinot.common.Utils; import org.apache.pinot.common.config.TagNameUtils; import org.apache.pinot.common.metadata.ZKMetadataProvider; @@ -62,6 +59,12 @@ import org.apache.pinot.common.utils.ServiceStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + @SuppressWarnings("unused") public class HelixBrokerStarter { @@ -93,6 +96,8 @@ public class HelixBrokerStarter { private HelixManager _participantHelixManager; private TimeboundaryRefreshMessageHandlerFactory _tbiMessageHandler; + private LowWaterMarkService _lwmService; + public HelixBrokerStarter(Configuration brokerConf, String clusterName, String zkServer) throws Exception { this(brokerConf, clusterName, zkServer, null); @@ -168,6 +173,11 @@ public class HelixBrokerStarter { _helixDataAccessor = _spectatorHelixManager.getHelixDataAccessor(); ConfigAccessor configAccessor = _spectatorHelixManager.getConfigAccessor(); + // start lwm service + LowWaterMarkServiceProvider provider = new LowWaterMarkServiceProvider(_brokerConf, + _spectatorHelixManager.getHelixDataAccessor(), _clusterName); + _lwmService = provider.getInstance(); + // Set up the broker server builder LOGGER.info("Setting up broker server builder"); _helixExternalViewBasedRouting = @@ -184,13 +194,17 @@ public class HelixBrokerStarter { String enableQueryLimitOverride = configAccessor.get(helixConfigScope, Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE); _brokerConf.setProperty(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.valueOf(enableQueryLimitOverride)); _brokerServerBuilder = new BrokerServerBuilder(_brokerConf, _helixExternalViewBasedRouting, - _helixExternalViewBasedRouting.getTimeBoundaryService(), _helixExternalViewBasedQueryQuotaManager, _propertyStore); + _helixExternalViewBasedRouting.getTimeBoundaryService(), _helixExternalViewBasedQueryQuotaManager, + _propertyStore, _lwmService); BrokerRequestHandler brokerRequestHandler = _brokerServerBuilder.getBrokerRequestHandler(); BrokerMetrics brokerMetrics = _brokerServerBuilder.getBrokerMetrics(); _helixExternalViewBasedRouting.setBrokerMetrics(brokerMetrics); _helixExternalViewBasedQueryQuotaManager.setBrokerMetrics(brokerMetrics); _brokerServerBuilder.start(); + // start lwm service + _lwmService.start(brokerMetrics); + // Initialize the cluster change mediator LOGGER.info("Initializing cluster change mediator"); for (ClusterChangeHandler externalViewChangeHandler : _externalViewChangeHandlers) { @@ -241,6 +255,7 @@ public class HelixBrokerStarter { _participantHelixManager .addPreConnectCallback(() -> brokerMetrics.addMeteredGlobalValue(BrokerMeter.HELIX_ZOOKEEPER_RECONNECTS, 1L)); + // Register the service status handler registerServiceStatusHandler(); @@ -315,6 +330,10 @@ public class HelixBrokerStarter { _spectatorHelixManager.disconnect(); } + if (_lwmService != null) { + LOGGER.info("Shutting down low water mark service"); + _lwmService.shutDown(); + } LOGGER.info("Finish shutting down Pinot broker"); } @@ -347,6 +366,10 @@ public class HelixBrokerStarter { return new HelixBrokerStarter(brokerConf, "quickstart", "localhost:2122"); } + public LowWaterMarkService getLwmService() { + return _lwmService; + } + public static void main(String[] args) throws Exception { getDefault().start(); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 7667e61..1d8f97b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.core.segment.updater.LowWaterMarkService; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; import org.apache.helix.ZNRecord; @@ -76,6 +77,10 @@ import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.pinot.common.utils.CommonConstants.Broker.CONFIG_OF_BROKER_LWM_REWRITE_ENABLE; +import static org.apache.pinot.common.utils.CommonConstants.Broker.CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT; +import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.DISABLE_REWRITE; + @ThreadSafe public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { @@ -87,6 +92,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final AccessControlFactory _accessControlFactory; protected final QueryQuotaManager _queryQuotaManager; protected final BrokerMetrics _brokerMetrics; + protected final LowWaterMarkService _lwmService; protected final AtomicLong _requestIdGenerator = new AtomicLong(); protected final BrokerRequestOptimizer _brokerRequestOptimizer = new BrokerRequestOptimizer(); @@ -96,6 +102,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { protected final long _brokerTimeoutMs; protected final int _queryResponseLimit; protected final int _queryLogLength; + protected final boolean _enableQueryRewrite; private final RateLimiter _queryLogRateLimiter; @@ -108,13 +115,15 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { public BaseBrokerRequestHandler(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory, - QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore) { + QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore, + LowWaterMarkService lowWaterMarkService) { _config = config; _routingTable = routingTable; _timeBoundaryService = timeBoundaryService; _accessControlFactory = accessControlFactory; _queryQuotaManager = queryQuotaManager; _brokerMetrics = brokerMetrics; + _lwmService = lowWaterMarkService; _enableCaseInsensitivePql = _config.getBoolean(CommonConstants.Helix.ENABLE_CASE_INSENSITIVE_PQL_KEY, false); if (_enableCaseInsensitivePql) { @@ -125,6 +134,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { _enableQueryLimitOverride = _config.getBoolean(Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, false); + // query rewrite for upsert feature + _enableQueryRewrite = config.getBoolean(CONFIG_OF_BROKER_LWM_REWRITE_ENABLE, + CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT); + _brokerId = config.getString(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId()); _brokerTimeoutMs = config.getLong(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS); _queryResponseLimit = @@ -285,6 +298,11 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME); } + if (shouldEnableLowWaterMarkRewrite(request)) { + // Augment the realtime request with LowWaterMark constraints. + addLowWaterMarkToQuery(realtimeBrokerRequest, rawTableName); + } + // Calculate routing table for the query long routingStartTimeNs = System.nanoTime(); Map<ServerInstance, List<String>> offlineRoutingTable = null; @@ -368,6 +386,21 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { return brokerResponse; } + private boolean shouldEnableLowWaterMarkRewrite(JsonNode request) { + if (_enableQueryRewrite) { + try { + if (request.has(DISABLE_REWRITE)) { + return !request.get(DISABLE_REWRITE).asBoolean(); + } else { + return true; + } + } catch (Exception ex) { + LOGGER.warn("cannot parse the disable rewrite option: [{}] to boolean from request json", DISABLE_REWRITE, ex); + } + } + return false; + } + /** * Reset limit for selection query if it exceeds maxQuerySelectionLimit. * @param brokerRequest @@ -753,6 +786,18 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } } + private void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, String rawTableName) { + final String realtimeTableName = rawTableName + "_REALTIME"; + Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName); + if (lowWaterMarks == null || lowWaterMarks.size() == 0) { + LOGGER.info("No low water marks info found for table {}", realtimeTableName); + return; + } + LOGGER.info("Found low water marks {} for table {}", String.valueOf(lowWaterMarks), realtimeTableName); + LowWaterMarkQueryWriter.addLowWaterMarkToQuery(realtimeBrokerRequest, lowWaterMarks); + LOGGER.info("Query augmented with LWMS info for table {} : {}", realtimeTableName, realtimeBrokerRequest); + } + /** * Processes the optimized broker requests for both OFFLINE and REALTIME table. */ diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java new file mode 100644 index 0000000..c3285e5 --- /dev/null +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.FilterOperator; +import org.apache.pinot.common.request.FilterQuery; +import org.apache.pinot.common.request.FilterQueryMap; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// Add a lwm query to the filter query of a Pinot query for upsert enabled table. +// Thread-Safe +public class LowWaterMarkQueryWriter { + private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkQueryWriter.class); + private static final String VALID_FROM = "$validFrom"; + private static final String VALID_UNTIL = "$validUntil"; + // Normal Pinot query node uses positive IDs. So lwm query node ids are all negative. + private static final int QUERY_ID_BASE = -1000; + + /** + * For upsert enabled tables, augment the realtime query with low water mark constraints in its filter query of the + * form + * ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1) + * + * @param realtimeBrokerRequest + * @param lowWaterMarks + */ + public static void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) { + if (lowWaterMarks == null || lowWaterMarks.size() == 0) { + LOGGER.warn("No low water mark info found for query: {}", realtimeBrokerRequest); + return; + } + + // Choose the min lwm among all partitions. + long minLwm = Collections.min(lowWaterMarks.values()); + + // 1. Build the low water mark query of the form for a table assuming lwm is the min LWM and -1 is used as + // uninitialized marker. + // ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1) + // -1 is used instead of Long.MAXVALUE because Pinot does not handle long arithmetic correctly. + FilterQuery lwmQuery = addSinglePartitionLowWaterMark(QUERY_ID_BASE - 1, realtimeBrokerRequest, minLwm); + + // 2. Attach low water mark filter to the current filters. + FilterQuery currentFilterQuery = realtimeBrokerRequest.getFilterQuery(); + if (currentFilterQuery != null) { + // Make an AND query of lwmQuery and the existing query. + FilterQuery andFilterQuery = new FilterQuery(); + // Make sure we do not reuse any query id in lwmQuerys. + andFilterQuery.setId(QUERY_ID_BASE); + andFilterQuery.setOperator(FilterOperator.AND); + List<Integer> nestedFilterQueryIds = new ArrayList<>(2); + nestedFilterQueryIds.add(currentFilterQuery.getId()); + nestedFilterQueryIds.add(lwmQuery.getId()); + andFilterQuery.setNestedFilterQueryIds(nestedFilterQueryIds); + + realtimeBrokerRequest.setFilterQuery(andFilterQuery); + FilterQueryMap filterSubQueryMap = realtimeBrokerRequest.getFilterSubQueryMap(); + filterSubQueryMap.putToFilterQueryMap(lwmQuery.getId(), lwmQuery); + filterSubQueryMap.putToFilterQueryMap(andFilterQuery.getId(), andFilterQuery); + } else { + realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery); + realtimeBrokerRequest.setFilterQuery(lwmQuery); + } + } + + /** + * + * @param queryIdBase The starting id that will be assigned to the first query created in ths method. + * @param realtimeBrokerRequest + * @param lwm low water mark. + * @return a filter query corresponding to the low water mark constraint of a single partition. The general form is: + * ($ValidFrom <= lwm && $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1) + */ + private static FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest, + Long lwm) { + // ValidFromQuery: ($ValidFrom <= lwm && $validFrom > -1) + FilterQuery validFromFilterQuery = new FilterQuery(); + // Important: Always decrement queryIdBase value after use to avoid id conflict. + validFromFilterQuery.setId(queryIdBase--); + validFromFilterQuery.setOperator(FilterOperator.AND); + FilterQuery validFromP1 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(*\t\t" + lwm + "]", FilterOperator.RANGE, realtimeBrokerRequest); + FilterQuery validFromP2 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(-1\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest); + List<Integer> nestedQueriesIdForValidFrom = new ArrayList<>(); + nestedQueriesIdForValidFrom.add(validFromP1.getId()); + nestedQueriesIdForValidFrom.add(validFromP2.getId()); + validFromFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidFrom); + + // ValidUtilQuery: (lwm < $validUtil OR $validUtil = -1) + FilterQuery validUtilFilterQuery = new FilterQuery(); + validUtilFilterQuery.setId(queryIdBase--); + validUtilFilterQuery.setOperator(FilterOperator.OR); + + FilterQuery validUtilP1 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "(" + lwm + "\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest); + FilterQuery validUtilP2 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "-1", FilterOperator.EQUALITY, realtimeBrokerRequest); + List<Integer> nestedQueriesIdForValidUtil = new ArrayList<>(); + nestedQueriesIdForValidUtil.add(validUtilP1.getId()); + nestedQueriesIdForValidUtil.add(validUtilP2.getId()); + validUtilFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidUtil); + + // Top level query: ValidFromQuery AND ValidUtilQuery + FilterQuery lwmQuery = new FilterQuery(); + lwmQuery.setId(queryIdBase--); + lwmQuery.setOperator(FilterOperator.AND); + List<Integer> nestQids = new ArrayList<>(); + nestQids.add(validFromFilterQuery.getId()); + nestQids.add(validUtilFilterQuery.getId()); + lwmQuery.setNestedFilterQueryIds(nestQids); + + // Add all the new created queries to the query map. + realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery); + realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validFromFilterQuery.getId(), validFromFilterQuery); + realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validUtilFilterQuery.getId(), validUtilFilterQuery); + return lwmQuery; + } + + private static FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator, + BrokerRequest realtimeBrokerRequest) { + FilterQuery filterQuery = new FilterQuery(); + filterQuery.setColumn(column); + filterQuery.setId(id); + filterQuery.setValue(Collections.singletonList(value)); + filterQuery.setOperator(operator); + if (realtimeBrokerRequest.getFilterSubQueryMap() == null) { + realtimeBrokerRequest.setFilterSubQueryMap(new FilterQueryMap()); + } + realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(id, filterQuery); + return filterQuery; + } +} diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java index 844a200..95f943f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; + +import org.apache.pinot.core.segment.updater.LowWaterMarkService; import org.apache.commons.configuration.Configuration; import org.apache.helix.ZNRecord; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -57,8 +59,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl public SingleConnectionBrokerRequestHandler(Configuration config, RoutingTable routingTable, TimeBoundaryService timeBoundaryService, AccessControlFactory accessControlFactory, - QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore) { - super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics, propertyStore); + QueryQuotaManager queryQuotaManager, BrokerMetrics brokerMetrics, ZkHelixPropertyStore<ZNRecord> propertyStore, + LowWaterMarkService lowWaterMarkService) { + super(config, routingTable, timeBoundaryService, accessControlFactory, queryQuotaManager, brokerMetrics, + propertyStore, lowWaterMarkService); _queryRouter = new QueryRouter(_brokerId, brokerMetrics); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java similarity index 57% copy from pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java copy to pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java index 63424ac..94b3be4 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java @@ -16,34 +16,32 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.server.upsert; +package org.apache.pinot.broker.upsert; -import com.codahale.metrics.MetricRegistry; -import org.apache.commons.configuration.Configuration; +import com.google.common.collect.ImmutableMap; +import org.apache.helix.HelixDataAccessor; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.core.segment.updater.LowWaterMarkService; -public class DefaultUpsertComponentContainer implements UpsertComponentContainer { +import java.util.Map; - private final SegmentDeletionHandler deletionHandler = new SegmentDeletionHandler(); +public class DefaultLowWaterMarkService implements LowWaterMarkService { @Override - public void registerMetrics(MetricRegistry registry) { - - } - - @Override - public void init(Configuration config) { + public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, + int serverPort){ } @Override - public SegmentDeletionHandler getSegmentDeletionHandler() { - return deletionHandler; + public Map<Integer, Long> getLowWaterMarks(String tableName) { + return ImmutableMap.of(); } @Override - public void start() { + public void shutDown() { } @Override - public void stop() { + public void start(BrokerMetrics brokerMetrics) { } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java similarity index 50% copy from pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java copy to pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java index fe4af07..f5c06f3 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/LowWaterMarkServiceProvider.java @@ -16,43 +16,48 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.server.upsert; +package org.apache.pinot.broker.upsert; import com.google.common.base.Preconditions; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.pinot.core.segment.updater.WatermarkManager; -import org.apache.pinot.grigio.common.metrics.GrigioMetrics; +import org.apache.helix.HelixDataAccessor; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.core.segment.updater.LowWaterMarkService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class UpsertComponentContainerProvider { +import static org.apache.pinot.common.utils.CommonConstants.Broker.*; - private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerProvider.class); - public static final String UPSERT_COMPONENT_CONFIG_KEY = "watermarkManager.class"; - public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName(); +public class LowWaterMarkServiceProvider { - private final Configuration _conf; - private UpsertComponentContainer _instance; + private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkServiceProvider.class); - public UpsertComponentContainerProvider(Configuration conf, GrigioMetrics metrics) { - _conf = conf; - String className = _conf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT); + private LowWaterMarkService _instance; + + public LowWaterMarkServiceProvider(Configuration brokerConfig, HelixDataAccessor dataAccessor, String clusterName) { + String className = brokerConfig.getString(CommonConstants.Broker.CONFIG_OF_BROKER_LWMS_CLASS_NAME, + DefaultLowWaterMarkService.class.getName()); LOGGER.info("creating watermark manager with class {}", className); try { - Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className); - Preconditions.checkState(comonentContainerClass.isAssignableFrom(WatermarkManager.class), - "configured class not assignable from Callback class"); + Class<LowWaterMarkService> comonentContainerClass = (Class<LowWaterMarkService>) Class.forName(className); + Preconditions.checkState(comonentContainerClass.isAssignableFrom(LowWaterMarkService.class), + "configured class not assignable from LowWaterMarkService class"); _instance = comonentContainerClass.newInstance(); - _instance.init(_conf); + _instance.init(dataAccessor, clusterName, + brokerConfig.getInt(CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS, + DEFAULT_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS), + brokerConfig.getInt(CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT, + CommonConstants.Server.DEFAULT_ADMIN_API_PORT)); } catch (Exception e) { LOGGER.error("failed to load watermark manager class", className, e); + _instance = null; ExceptionUtils.rethrow(e); } } - public UpsertComponentContainer getInstance() { + public LowWaterMarkService getInstance() { return _instance; } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java new file mode 100644 index 0000000..1021477 --- /dev/null +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.requesthandler; + +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.common.request.FilterOperator; +import org.apache.pinot.common.request.FilterQuery; +import org.apache.pinot.pql.parsers.Pql2Compiler; +import org.apache.thrift.TException; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +public class LowWaterMarkQueryWriterTest { + @Test + public void testRewriteQueryWithoutExistingFilters() throws Exception{ + Pql2Compiler pql2Compiler = new Pql2Compiler(); + BrokerRequest req = pql2Compiler.compileToBrokerRequest("SELECT * FROM T"); + Assert.assertFalse(req.isSetFilterQuery()); + Map<Integer, Long> lwms = new HashMap<>(); + lwms.put(0, 10L); + lwms.put(1, 20L); + LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms); + Assert.assertTrue(req.isSetFilterQuery()); + try { + req.validate(); + } catch (TException e) { + Assert.fail("Query after low water mark query is not valid: ", e); + } + // Verify there are in total 7 filter query nodes in the filter query tree. + Map<Integer,FilterQuery> filterSubQueryMap = req.getFilterSubQueryMap().getFilterQueryMap(); + Assert.assertEquals(filterSubQueryMap.size(), 7); + + Integer lwmQueryId = req.getFilterQuery().getId(); + // 1. Verify the low water mark query. + FilterQuery lwmQuery = filterSubQueryMap.get(lwmQueryId); + verifyNoneTerminalFilterQuery(lwmQuery, FilterOperator.AND, 2); + FilterQuery validFrom1Query = filterSubQueryMap.get(lwmQuery.getNestedFilterQueryIds().get(0)); + FilterQuery validTo1Query = filterSubQueryMap.get(lwmQuery.getNestedFilterQueryIds().get(1)); + + // Verify the subtree (i.e., an AND with two nodes) for the $validFrom column. + verifyNoneTerminalFilterQuery(validFrom1Query, FilterOperator.AND, 2); + verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(0)), + "$validFrom", "(*\t\t10]", FilterOperator.RANGE); + verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(1)), + "$validFrom", "(-1\t\t*)", FilterOperator.RANGE); + + // Verify the subtree (i.e., an OR with two nodes) for the $validutil column. + verifyNoneTerminalFilterQuery(validTo1Query, FilterOperator.OR, 2); + verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(0)), + "$validUntil", "(10\t\t*)", FilterOperator.RANGE); + verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(1)), + "$validUntil", "-1", FilterOperator.EQUALITY); + } + + @Test + public void testRewriteQueryWithExistingFilters() { + Pql2Compiler pql2Compiler = new Pql2Compiler(); + BrokerRequest req = pql2Compiler.compileToBrokerRequest("SELECT * FROM T WHERE A < 4"); + Assert.assertTrue(req.isSetFilterQuery()); + Map<Integer, Long> lwms = new HashMap<>(); + lwms.put(0, 10L); + lwms.put(1, 20L); + LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms); + Assert.assertTrue(req.isSetFilterQuery()); + try { + req.validate(); + } catch (TException e) { + Assert.fail("Query after low water mark query is not valid: ", e); + } + // Verify there are in total 9 filter query nodes in the filter query tree. + Map<Integer,FilterQuery> filterSubQueryMap = req.getFilterSubQueryMap().getFilterQueryMap(); + Assert.assertEquals(filterSubQueryMap.size(), 9); + // 0. Verify there are one top level filter of operator OR with two sub filter queries. + FilterQuery rootFilterQuery = req.getFilterQuery(); + verifyNoneTerminalFilterQuery(rootFilterQuery, FilterOperator.AND, 2); + // 1. Verify the existing filter query A < 4 is not affected. + verifyTerminalFilterQuery(filterSubQueryMap.get(rootFilterQuery.getNestedFilterQueryIds().get(0)), "A", "(*\t\t4)", FilterOperator.RANGE); + + FilterQuery lowWaterMarkQuery = filterSubQueryMap.get(rootFilterQuery.getNestedFilterQueryIds().get(1)); + // Verify the lwm query + verifyNoneTerminalFilterQuery(lowWaterMarkQuery, FilterOperator.AND, 2); + FilterQuery validFrom1Query = filterSubQueryMap.get(lowWaterMarkQuery.getNestedFilterQueryIds().get(0)); + FilterQuery validTo1Query = filterSubQueryMap.get(lowWaterMarkQuery.getNestedFilterQueryIds().get(1)); + + // Verify the subtree (i.e., an AND with two nodes) for the $validFrom column. + verifyNoneTerminalFilterQuery(validFrom1Query, FilterOperator.AND, 2); + verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(0)), + "$validFrom", "(*\t\t10]", FilterOperator.RANGE); + verifyTerminalFilterQuery(filterSubQueryMap.get(validFrom1Query.getNestedFilterQueryIds().get(1)), + "$validFrom", "(-1\t\t*)", FilterOperator.RANGE); + + // Verify the subtree (i.e., an OR with two nodes) for the $validutil column. + verifyNoneTerminalFilterQuery(validTo1Query, FilterOperator.OR, 2); + verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(0)), + "$validUntil", "(10\t\t*)", FilterOperator.RANGE); + verifyTerminalFilterQuery(filterSubQueryMap.get(validTo1Query.getNestedFilterQueryIds().get(1)), + "$validUntil", "-1", FilterOperator.EQUALITY); + } + + private void verifyTerminalFilterQuery(FilterQuery filterQuery, String column, String value, FilterOperator op) { + Assert.assertEquals(filterQuery.getColumn(), column); + Assert.assertEquals(filterQuery.getValue(), Collections.singletonList(value)); + Assert.assertEquals(filterQuery.getOperator(), op); + } + + private void verifyNoneTerminalFilterQuery(FilterQuery filterQuery, FilterOperator op, int numOfChildQueries) { + Assert.assertEquals(filterQuery.getOperator(), op); + Assert.assertEquals(filterQuery.getNestedFilterQueryIdsSize(), numOfChildQueries); + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index a6f242c..0e65779 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -175,7 +175,9 @@ public class CommonConstants { public static final String CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE = "pinot.broker.enable.query.limit.override"; public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS = "pinot.broker.query.polling.server.lwms.interval.ms"; + public static final int DEFAULT_OF_BROKER_POLLING_SERVER_LWMS_INTERVAL_MS = 5 * 1_000; public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT = "pinot.broker.query.polling.server.lwms.port"; + public static final String CONFIG_OF_BROKER_LWMS_CLASS_NAME = "pinot.broker.lwms.classname"; public static final String CONFIG_OF_BROKER_LWM_REWRITE_ENABLE = "pinot.broker.query.lwm.rewrite"; public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = true; public static class Request { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index a494e3f..9cce40d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -112,6 +112,18 @@ import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; public class PinotHelixResourceManager { private static final Logger LOGGER = LoggerFactory.getLogger(PinotHelixResourceManager.class); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java similarity index 71% copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java index 01579e7..b45060c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/DefaultWaterMarkManager.java @@ -18,16 +18,23 @@ */ package org.apache.pinot.core.segment.updater; +import com.google.common.collect.ImmutableMap; import org.apache.commons.configuration.Configuration; -import org.apache.pinot.grigio.common.metrics.GrigioMeter; import org.apache.pinot.grigio.common.metrics.GrigioMetrics; import java.util.Map; -public interface WatermarkManager { +public class DefaultWaterMarkManager implements WaterMarkManager { - void init(Configuration config, GrigioMetrics metrics); + private static final Map<String, Map<Integer, Long>> DEFAULT_MAP = ImmutableMap.of(); - Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap(); + @Override + public void init(Configuration config, GrigioMetrics metrics) { + } + + @Override + public Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap() { + return DEFAULT_MAP; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java similarity index 56% copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java index 01579e7..2fd7434 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java @@ -18,16 +18,25 @@ */ package org.apache.pinot.core.segment.updater; -import org.apache.commons.configuration.Configuration; -import org.apache.pinot.grigio.common.metrics.GrigioMeter; -import org.apache.pinot.grigio.common.metrics.GrigioMetrics; +import org.apache.helix.HelixDataAccessor; +import org.apache.pinot.common.metrics.BrokerMetrics; import java.util.Map; -public interface WatermarkManager { +/** + * LowWaterMarkService keeps records of the low water mark (i.e., the stream ingestion progress) for each partition of + * an input table. + */ +public interface LowWaterMarkService { + + void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort); - void init(Configuration config, GrigioMetrics metrics); + // Return the low water mark mapping from partition id to the corresponding low water mark of a given table. + Map<Integer, Long> getLowWaterMarks(String tableName); - Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap(); + // Shutdown the service. + void shutDown(); + // start + void start(BrokerMetrics brokerMetrics); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentDeletionHandler.java similarity index 94% rename from pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java rename to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentDeletionHandler.java index 1aba06b..1db6f84 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/SegmentDeletionHandler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/SegmentDeletionHandler.java @@ -16,10 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.server.upsert; +package org.apache.pinot.core.segment.updater; import com.google.common.collect.ImmutableList; -import org.apache.pinot.core.segment.updater.SegmentDeletionListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java similarity index 67% rename from pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java rename to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java index 7f636fc..eb64285 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainer.java @@ -16,20 +16,27 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.server.upsert; +package org.apache.pinot.core.segment.updater; -import com.codahale.metrics.MetricRegistry; +import com.yammer.metrics.core.MetricsRegistry; import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; public interface UpsertComponentContainer { - void registerMetrics(MetricRegistry registry); + void registerMetrics(String prefix, MetricsRegistry registry); - void init(Configuration config); + void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName); + + void startBackgroundThread(); + + void stopBackgroundThread(); + + void shutdown(); SegmentDeletionHandler getSegmentDeletionHandler(); - void start(); + WaterMarkManager getWatermarkManager(); - void stop(); + boolean isUpsertEnabled(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java similarity index 96% copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java index 01579e7..acc8479 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WaterMarkManager.java @@ -24,10 +24,11 @@ import org.apache.pinot.grigio.common.metrics.GrigioMetrics; import java.util.Map; -public interface WatermarkManager { +public interface WaterMarkManager { void init(Configuration config, GrigioMetrics metrics); Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap(); + } diff --git a/pinot-grigio/pinot-grigio-provided/pom.xml b/pinot-grigio/pinot-grigio-provided/pom.xml index 5643070..ec7b4ca 100644 --- a/pinot-grigio/pinot-grigio-provided/pom.xml +++ b/pinot-grigio/pinot-grigio-provided/pom.xml @@ -52,8 +52,40 @@ <artifactId>mockito-core</artifactId> <scope>test</scope> </dependency> - </dependencies> - - + <dependency> + <groupId>com.github.tomakehurst</groupId> + <artifactId>wiremock-jre8</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.ow2.asm</groupId> + <artifactId>asm</artifactId> + </exclusion> + <exclusion> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-broker</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-controller</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.pinot</groupId> + <artifactId>pinot-controller</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + </dependencies> </project> \ No newline at end of file diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java new file mode 100644 index 0000000..f7e88c9 --- /dev/null +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.upsert; + +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.manager.zk.ZkCacheBaseDataAccessor; +import org.apache.helix.model.InstanceConfig; +import org.apache.pinot.common.metrics.BrokerGauge; +import org.apache.pinot.common.metrics.BrokerMeter; +import org.apache.pinot.common.metrics.BrokerMetrics; +import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo; +import org.apache.pinot.core.segment.updater.LowWaterMarkService; +import org.glassfish.jersey.client.ClientProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.WebTarget; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +// A low water mark service which polls various Pinot servers periodically to get the low water marks for partitions of +// servers. +public class PollingBasedLowWaterMarkService implements LowWaterMarkService { + private static final Logger LOGGER = LoggerFactory.getLogger(PollingBasedLowWaterMarkService.class); + + private static final String LWMS_PATH = "lwms"; + private static final String HTTP = "http"; + + private static final int SERVER_CONNENCT_TIMEOUT_MS = 10000; + private static final int SERVER_READ_TIMEOUT = 10000; + private static final String SERVER_PREFIX = "Server_"; + + // A map from table_name to its partition->lwm mapping. + private Map<String, Map<Integer, Long>> _tableLowWaterMarks; + private ZkCacheBaseDataAccessor<ZNRecord> _cacheInstanceConfigsDataAccessor; + private Client _httpClient; + // We can tune this polling interval to make sure we get the fresh snapshot of server low water marks. + private int _serverPollingInterval; + private int _serverPort; + private boolean _shuttingDown; + private BrokerMetrics _brokerMetrics; + + @Override + public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort) { + // Construct the zk path to get the server instances. + String instanceConfigs = PropertyPathBuilder.instanceConfig(helixClusterName); + // Build a zk data reader. + _cacheInstanceConfigsDataAccessor = + new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) helixDataAccessor.getBaseDataAccessor(), + instanceConfigs, null, Collections.singletonList(instanceConfigs)); + _tableLowWaterMarks = new ConcurrentHashMap<>(); + _httpClient = ClientBuilder.newClient(); + _httpClient.property(ClientProperties.CONNECT_TIMEOUT, SERVER_CONNENCT_TIMEOUT_MS); + _httpClient.property(ClientProperties.READ_TIMEOUT, SERVER_READ_TIMEOUT); + _serverPollingInterval = serverPollingInterval; + _serverPort = serverPort; + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + shutDown(); + } catch (final Exception e) { + LOGGER.error("Caught exception while running shutdown hook", e); + } + } + }); + } + + @Override + public void start(BrokerMetrics brokerMetrics) { + _brokerMetrics = brokerMetrics; + Thread serverPollingThread = new Thread(new PinotServerPollingExecutor()); + serverPollingThread.start(); + } + + @Override + public Map<Integer, Long> getLowWaterMarks(String tableName) { + return _tableLowWaterMarks.get(tableName); + } + + @Override + public void shutDown() { + _shuttingDown = true; + } + + // Poll all the servers periodically to find out the Low Water Mark info. + private class PinotServerPollingExecutor implements Runnable { + @Override + public void run() { + while (!_shuttingDown) { + try { + Map<String, Map<Integer, Long>> latestLowWaterMarks = new ConcurrentHashMap<>(); + // 1. Find out all the alive servers. + List<String> serverInstances = _cacheInstanceConfigsDataAccessor.getChildNames("/", AccessOption.PERSISTENT); + List<ZNRecord> instances = _cacheInstanceConfigsDataAccessor.getChildren("/", null, AccessOption.PERSISTENT); + for (ZNRecord r : instances) { + LOGGER.info("Instance info for lwms: {}", r.toString()); + } + // 2. Ask each server for its low water mark info. + for (String serverIntanceId : serverInstances) { + // Check the instance is in fact a server. + if (!serverIntanceId.startsWith(SERVER_PREFIX) && !serverIntanceId.startsWith("server_")) + continue; + InstanceConfig serverConfig = InstanceConfig.toInstanceConfig(serverIntanceId.substring( + SERVER_PREFIX.length())); + try { + // (TODO) Fixing this. Hardcode using the default server admin port for now. + WebTarget webTarget = _httpClient.target(getURI(serverConfig.getHostName(), _serverPort)); + TableLowWaterMarksInfo lwms = webTarget.path(PollingBasedLowWaterMarkService.LWMS_PATH).request(). + get(TableLowWaterMarksInfo.class); + LOGGER.info("Found low water mark info for server {}: {}", serverIntanceId, lwms.getTableLowWaterMarks()); + // 3. Update the low water marks. + LwmMerger.updateLowWaterMarks(latestLowWaterMarks, lwms.getTableLowWaterMarks()); + } catch (Exception e) { + // TODO(tingchen) Handle server failures. We could keep the last known lwms of a server. + LOGGER.warn("Error during getting low water marks from server {}", serverIntanceId, e); + } + } + // 4. Replace the broker's low water marks table with the latest low water mark info. + if (validate(latestLowWaterMarks)) { + _tableLowWaterMarks = latestLowWaterMarks; + } + // 5. Sleep for some interval. + Thread.sleep(_serverPollingInterval); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // It is OK for us to break out the loop early because the Low Water Mark refresh is best effort. + break; + } + } + } + + // Validate the low water mark info polled from all the servers are right. For now, return true. + // (TODO tingchen) figure out the right checks. + private boolean validate(Map<String, Map<Integer, Long>> latestLowWaterMarks) { + if (latestLowWaterMarks == null) { + _brokerMetrics.addMeteredGlobalValue(BrokerMeter.LOW_WATER_MARK_QUERY_FAILURES, 1); + return false; + } + for(String tableName : latestLowWaterMarks.keySet()) { + Map<Integer, Long> partitionLWMs = latestLowWaterMarks.get(tableName); + _brokerMetrics.addValueToTableGauge(tableName, BrokerGauge.TABLE_MIN_LOW_WATER_MARK, + Collections.min(partitionLWMs.values())); + } + return true; + } + + private URI getURI(String host, int port) throws URISyntaxException { + LOGGER.info("requesting host {} and port {}", host, port); + return new URI(PollingBasedLowWaterMarkService.HTTP, null, host, port, null + , null, null); + } + } + + static class LwmMerger { + // Update an existing map currentLwmsMap of tableName->low_water_marks with a new map of the same type. + // If an entry in the new map does not exist in currentLwmsMap, insert it to currentLwmsMap. + // otherwise merge the entry with the existing entry in currentLwmsMap using mergeTablePartitionLwms(). + static void updateLowWaterMarks(Map<String, Map<Integer, Long>> currentLwmsMap, + final Map<String, Map<Integer, Long>> serverLwmsMap) { + for (Map.Entry<String, Map<Integer, Long>> serverMap : serverLwmsMap.entrySet()) { + String tableName = serverMap.getKey(); + Map<Integer, Long> tableLwms = serverMap.getValue(); + if (currentLwmsMap.containsKey(tableName)) { + currentLwmsMap.put(tableName, + LwmMerger.mergeTablePartitionLwms(Collections.unmodifiableMap(currentLwmsMap.get(tableName)), + tableLwms)); + } else { + currentLwmsMap.put(tableName, tableLwms); + } + } + } + + // Merge all the entries in the two input maps of partition_id->lwm. + // If an entry exists only in a map, put it in the combined map. + // If an entry exists in both maps, use the entry with the smaller low water marks. + static Map<Integer, Long> mergeTablePartitionLwms(final Map<Integer, Long> m1, final Map<Integer, Long> m2) { + if (m1 == null || m1.size() == 0) { + return m2; + } + if (m2 == null || m2.size() == 0) { + return m1; + } + Map<Integer, Long> result = new HashMap<>(m1); + for (Map.Entry<Integer, Long> entry : m2.entrySet()) { + Integer partitionNo = entry.getKey(); + Long lwm = entry.getValue(); + if (result.containsKey(partitionNo)) { + result.put(partitionNo, Math.min(lwm, result.get(partitionNo))); + } else { + result.put(partitionNo, lwm); + } + } + return result; + } + } +} diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java index 1ddc963..b7fcdb8 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallback.java @@ -29,7 +29,7 @@ import org.apache.pinot.core.io.reader.BaseSingleColumnSingleValueReader; import org.apache.pinot.core.io.reader.DataFileReader; import org.apache.pinot.core.segment.index.column.ColumnIndexContainer; import org.apache.pinot.core.segment.index.readers.Dictionary; -import org.apache.pinot.core.segment.updater.UpsertWatermarkManager; +import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager; import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter; import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry; import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntrySet; @@ -50,7 +50,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback private String _segmentName; private int _totalDoc; private long _minSourceOffset; - private UpsertWatermarkManager _upsertWatermarkManager; + private UpsertWaterMarkManager _upsertWatermarkManager; private UpdateLogStorageProvider _updateLogStorageProvider; // use array for mapping bewteen offset to docId, where actual offset = min_offset + array_index // use 4 bytes per record @@ -67,7 +67,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback CommonConstants.Helix.TableType.REALTIME); _segmentName = segmentMetadata.getName(); _totalDoc = segmentMetadata.getTotalDocs(); - _upsertWatermarkManager = UpsertWatermarkManager.getInstance(); + _upsertWatermarkManager = UpsertWaterMarkManager.getInstance(); _updateLogStorageProvider = UpdateLogStorageProvider.getInstance(); _virtualColumnsReaderWriter = new ArrayList<>(); for (DataFileReader reader: virtualColumnIndexReader.values()) { @@ -80,7 +80,7 @@ public class UpsertImmutableIndexSegmentCallback implements IndexSegmentCallback */ @VisibleForTesting protected void init(List<VirtualColumnLongValueReaderWriter> readerWriters, - int totalDoc, UpsertWatermarkManager manager, + int totalDoc, UpsertWaterMarkManager manager, UpdateLogStorageProvider updateLogStorageProvider, long minSourceOffset, int[] offsetToDocId) { _tableNameWithType = "testTable"; diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java index eeda5a6..6eae5fb 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/data/manager/upsert/UpsertMutableIndexSegmentCallback.java @@ -24,7 +24,7 @@ import org.apache.pinot.common.segment.SegmentMetadata; import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.core.io.reader.DataFileReader; import org.apache.pinot.core.segment.index.column.ColumnIndexContainer; -import org.apache.pinot.core.segment.updater.UpsertWatermarkManager; +import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager; import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter; import org.apache.pinot.grigio.common.messages.LogEventType; import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry; @@ -48,7 +48,7 @@ public class UpsertMutableIndexSegmentCallback implements IndexSegmentCallback { private String _tableName; private String _segmentName; private Schema _schema; - private UpsertWatermarkManager _upsertWatermarkManager; + private UpsertWaterMarkManager _upsertWatermarkManager; private String _offsetColumnName; private final List<VirtualColumnLongValueReaderWriter> _mutableSegmentReaderWriters = new ArrayList<>(); // use map for mapping between kafka offset and docId because we at-most have 1 mutable segment per consumer @@ -75,7 +75,7 @@ public class UpsertMutableIndexSegmentCallback implements IndexSegmentCallback { _mutableSegmentReaderWriters.add((VirtualColumnLongValueReaderWriter) reader); } } - _upsertWatermarkManager = UpsertWatermarkManager.getInstance(); + _upsertWatermarkManager = UpsertWaterMarkManager.getInstance(); LOGGER.info("starting upsert segment with {} reader writer", _mutableSegmentReaderWriters.size()); } diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java index 530cd0d..5f0aa43 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/SegmentUpdater.java @@ -91,7 +91,7 @@ public class SegmentUpdater implements SegmentDeletionListener { _topicPrefix = conf.getString(SegmentUpdaterConfig.INPUT_TOPIC_PREFIX); _updateSleepMs = conf.getInt(SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS, SegmentUpdaterConfig.SEGMENT_UDPATE_SLEEP_MS_DEFAULT); - UpsertWatermarkManager.init(metrics); + UpsertWaterMarkManager.init(metrics); _consumer = provider.getConsumer(); _ingestionExecutorService = Executors.newFixedThreadPool(1); _updateLogStorageProvider = UpdateLogStorageProvider.getInstance(); @@ -179,7 +179,7 @@ public class SegmentUpdater implements SegmentDeletionListener { if (System.currentTimeMillis() - lastReportedTime > LOGGER_TIME_GAP_MS) { lastReportedTime = System.currentTimeMillis(); LOGGER.info("processed {} messages in {} ms", eventCount, System.currentTimeMillis() - loopStartTime); - LOGGER.info("latest high water mark is {}", UpsertWatermarkManager.getInstance().toString()); + LOGGER.info("latest high water mark is {}", UpsertWaterMarkManager.getInstance().toString()); } _consumer.ackOffset(); _metrics.addTimedValueMs(GrigioTimer.SEGMENT_UPDATER_LOOP_TIME, System.currentTimeMillis() - loopStartTime); diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainerImpl.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainerImpl.java new file mode 100644 index 0000000..20758f0 --- /dev/null +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertComponentContainerImpl.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.updater; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.yammer.metrics.core.MetricsRegistry; +import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; +import org.apache.pinot.grigio.common.metrics.GrigioMetrics; +import org.apache.pinot.grigio.common.storageProvider.UpdateLogStorageProvider; +import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogRetentionManager; +import org.apache.pinot.grigio.common.storageProvider.retentionManager.UpdateLogRetentionManagerImpl; +import org.apache.pinot.grigio.common.utils.IdealStateHelper; +import org.apache.pinot.grigio.servers.GrigioServerMetrics; +import org.apache.pinot.grigio.servers.KeyCoordinatorProvider; +import org.apache.pinot.grigio.servers.SegmentUpdaterProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.pinot.common.utils.CommonConstants.Grigio.PINOT_UPSERT_SERVER_COMPONENT_PREFIX; + +public class UpsertComponentContainerImpl implements UpsertComponentContainer { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerImpl.class); + + // config keys + public static final String ENABLED_CONFIG_KEY = "enabled"; + public static final String STORAGE_CONFIG_KEY = "storage"; + public static final String HOST_NAME_CONFIG_KEY = "hostname"; + public static final String KC_CONFIG_KEY = "kc"; + public static final String UPDATER_CONFIG_KEY = "updater"; + + private volatile boolean _isUpsertEnabled = false; + private AtomicBoolean _inited = new AtomicBoolean(false); + + // members of related upsert components + private Configuration _conf; + private String _hostName; + private GrigioMetrics _grigioMetrics; + private KeyCoordinatorProvider _keyCoordinatorProvider; + private SegmentUpdaterProvider _segmentUpdaterProvider; + private SegmentUpdater _segmentUpdater; + private UpdateLogRetentionManager _retentionManager; + private SegmentDeletionHandler _segmentDeletionHandler; + private WaterMarkManager _waterMarkManager; + + @Override + public void registerMetrics(String prefix, MetricsRegistry registry) { + _grigioMetrics = new GrigioServerMetrics(prefix + PINOT_UPSERT_SERVER_COMPONENT_PREFIX, registry); + _grigioMetrics.initializeGlobalMeters(); + } + + @Override + public void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName) { + Preconditions.checkState(!_inited.getAndSet(true), "cannot initialize upsert component twice"); + _isUpsertEnabled = config.getBoolean(ENABLED_CONFIG_KEY, false); + if (_isUpsertEnabled) { + LOGGER.info("initializing upsert components"); + _conf = config; + _hostName = _conf.getString(HOST_NAME_CONFIG_KEY); + initVirtualColumnStorageProvider(config); + _keyCoordinatorProvider = buildKeyCoordinatorProvider(config, _hostName); + _segmentUpdaterProvider = buildSegmentUpdaterProvider(config, _hostName); + _retentionManager = new UpdateLogRetentionManagerImpl( + new IdealStateHelper(helixManager.getClusterManagmentTool(), clusterName), instanceName + ); + _segmentUpdater = buildSegmentUpdater(config, _segmentUpdaterProvider, _retentionManager); + UpsertWaterMarkManager.init(_grigioMetrics); + _waterMarkManager = UpsertWaterMarkManager.getInstance(); + _segmentDeletionHandler = new SegmentDeletionHandler(ImmutableList.of(_segmentUpdater)); + } else { + _waterMarkManager = new DefaultWaterMarkManager(); + _segmentDeletionHandler = new SegmentDeletionHandler(); + } + _inited.set(true); + } + + @Override + public SegmentDeletionHandler getSegmentDeletionHandler() { + Preconditions.checkState(_inited.get(), "upsert container is not initialized yet"); + return _segmentDeletionHandler; + } + + @Override + public WaterMarkManager getWatermarkManager() { + return _waterMarkManager; + } + + @Override + public synchronized void startBackgroundThread() { + if (_segmentUpdater != null) { + _segmentUpdater.start(); + } + } + + @Override + public synchronized void stopBackgroundThread() { + if (_segmentUpdater != null) { + LOGGER.info("closing segment updater"); + _segmentUpdater.shutdown(); + } + } + + @Override + public boolean isUpsertEnabled() { + return _isUpsertEnabled; + } + + public synchronized void shutdown() { + if (_keyCoordinatorProvider != null) { + LOGGER.info("shutting down key coordinator provider"); + _keyCoordinatorProvider.close(); + } + } + + private void initVirtualColumnStorageProvider(Configuration conf) { + UpdateLogStorageProvider.init(conf.subset(STORAGE_CONFIG_KEY)); + } + + public KeyCoordinatorProvider buildKeyCoordinatorProvider(Configuration conf, String hostname) { + return new KeyCoordinatorProvider(conf.subset(KC_CONFIG_KEY), hostname, _grigioMetrics); + } + + public SegmentUpdaterProvider buildSegmentUpdaterProvider(Configuration conf, String hostname) { + return new SegmentUpdaterProvider(conf.subset(UPDATER_CONFIG_KEY), hostname, _grigioMetrics); + } + + public SegmentUpdater buildSegmentUpdater(Configuration conf, SegmentUpdaterProvider updateProvider, + UpdateLogRetentionManager updateLogRetentionManager) { + return new SegmentUpdater(conf.subset(UPDATER_CONFIG_KEY), updateProvider, updateLogRetentionManager, + _grigioMetrics); + } +} diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java similarity index 90% rename from pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java rename to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java index faad27b..266f25d2 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWatermarkManager.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/core/segment/updater/UpsertWaterMarkManager.java @@ -31,26 +31,26 @@ import org.slf4j.LoggerFactory; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -public class UpsertWatermarkManager implements WatermarkManager { +public class UpsertWaterMarkManager implements WaterMarkManager { private final Map<String, Map<Integer, Long>> _highWaterMarkTablePartitionMap = new ConcurrentHashMap<>(); private final GrigioMetrics _metrics; - private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWatermarkManager.class); - private static volatile UpsertWatermarkManager _instance; + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertWaterMarkManager.class); + private static volatile UpsertWaterMarkManager _instance; - private UpsertWatermarkManager(GrigioMetrics metrics) { + private UpsertWaterMarkManager(GrigioMetrics metrics) { _metrics = metrics; } public static void init(GrigioMetrics metrics) { - synchronized (UpsertWatermarkManager.class) { + synchronized (UpsertWaterMarkManager.class) { Preconditions.checkState(_instance == null, "upsert water mark manager is already init"); - _instance = new UpsertWatermarkManager(metrics); + _instance = new UpsertWaterMarkManager(metrics); } } - public static UpsertWatermarkManager getInstance() { + public static UpsertWaterMarkManager getInstance() { Preconditions.checkState(_instance != null, "upsert water mark manager is not yet init"); return _instance; } diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkServiceTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkServiceTest.java new file mode 100644 index 0000000..611f257 --- /dev/null +++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkServiceTest.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.broker.upsert; + +import com.github.tomakehurst.wiremock.WireMockServer; +import com.github.tomakehurst.wiremock.client.WireMock; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.examples.MasterSlaveStateModelFactory; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.pinot.broker.broker.helix.HelixBrokerStarter; +import org.apache.pinot.common.config.TagNameUtils; +import org.apache.pinot.common.restlet.resources.ResourceUtils; +import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.common.utils.ZkStarter; +import org.apache.pinot.controller.helix.ControllerTest; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixAdmin; +import org.testng.Assert; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.get; +import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor; +import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo; +import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE; + +public class PollingBasedLowWaterMarkServiceTest extends ControllerTest { + private PinotHelixResourceManager _pinotResourceManager; + private static final String HELIX_CLUSTER_NAME = "TestLowWaterMarksPolling"; + private final Configuration _pinotHelixBrokerProperties = new PropertiesConfiguration(); + + private HelixAdmin _helixAdmin; + private HelixBrokerStarter _helixBrokerStarter; + +// @Test + public void testBrokerCallServersCorrectly() + throws Exception { + ZkStarter.startLocalZkServer(); + final String instanceId = "localhost_helixController"; + _pinotResourceManager = + new PinotHelixResourceManager(ZkStarter.DEFAULT_ZK_STR, HELIX_CLUSTER_NAME, null, 10000L, + true, /*isUpdateStateModel=*/false, true); + HelixManager helixManager = registerAndConnectAsHelixParticipant(HELIX_CLUSTER_NAME, instanceId, ZkStarter.DEFAULT_ZK_STR); + _pinotResourceManager.start(helixManager); + _helixAdmin = _pinotResourceManager.getHelixAdmin(); + + // Set up a cluster with one controller and 2 servers. + addFakeBrokerInstancesToAutoJoinHelixCluster(1, true); + addFakeServerInstancesToAutoJoinHelixCluster(2, true); + + + _pinotHelixBrokerProperties.addProperty(CommonConstants.Helix.KEY_OF_BROKER_QUERY_PORT, 8943); + _pinotHelixBrokerProperties + .addProperty(CommonConstants.Broker.CONFIG_OF_BROKER_REFRESH_TIMEBOUNDARY_INFO_SLEEP_INTERVAL, 100L); + + + // Set the two servers' lwms info. + Map<Integer, Long> table1Map = new HashMap<>(); + table1Map.put(0, 10L); + table1Map.put(1, 20L); + Map<Integer, Long> table2Map = new HashMap<>(); + table2Map.put(0, 11L); + Map<String, Map<Integer, Long>> server1LwmsMap = new ConcurrentHashMap<>(); + server1LwmsMap.put("Table1", table1Map); + server1LwmsMap.put("Table2", table2Map); + + Map<Integer, Long> newTable1Map = new HashMap<>(); + newTable1Map.put(0, 15L); + newTable1Map.put(1, 18L); + Map<Integer, Long> table3Map = new HashMap<>(); + table3Map.put(0, 17L); + Map<String, Map<Integer, Long>> server2LwmsMap = new HashMap<>(); + server2LwmsMap.put("Table1", newTable1Map); + server2LwmsMap.put("Table3", table3Map); + + + WireMockServer mockServer1 = new WireMockServer(1); + mockServer1.start(); + mockServer1.stubFor(WireMock.get(WireMock.urlEqualTo("/lwms")).willReturn(WireMock.aResponse() + .withBody(ResourceUtils.convertToJsonString(new TableLowWaterMarksInfo(server1LwmsMap))) + .withHeader("Content-Type", "application/json") + .withStatus(200))); + WireMockServer mockServer2 = new WireMockServer(2); + mockServer2.start(); + mockServer2.stubFor(WireMock.get(WireMock.urlEqualTo("/lwms")).willReturn(WireMock.aResponse() + .withBody(ResourceUtils.convertToJsonString(new TableLowWaterMarksInfo(server2LwmsMap))) + .withHeader("Content-Type", "application/json") + .withStatus(200))); + + _helixBrokerStarter = + new HelixBrokerStarter(_pinotHelixBrokerProperties, HELIX_CLUSTER_NAME, ZkStarter.DEFAULT_ZK_STR); + + + while (_helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_OFFLINE").size() == 0 + || _helixAdmin.getInstancesInClusterWithTag(HELIX_CLUSTER_NAME, "DefaultTenant_BROKER").size() == 0) { + Thread.sleep(100); + } + + Thread.sleep(1000); + + // Verify the low water mark service behaviors. + mockServer1.verify(1, WireMock.getRequestedFor(WireMock.urlEqualTo("/lwms"))); + mockServer2.verify(1, WireMock.getRequestedFor(WireMock.urlEqualTo("/lwms"))); + + Assert.assertNotNull(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1")); + Assert.assertNotNull(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table2")); + Assert.assertNotNull(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table3")); + + // Table 1 verification. + Assert.assertEquals(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1").size(), 2); + Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1").get(Integer.parseInt("0")) == 10L); + Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table1").get(Integer.parseInt("1")) == 18L); + // Table 1 verification. + Assert.assertEquals(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table2").size(), 1); + Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table2").get(Integer.parseInt("0")) == 11L); + // Table 1 verification. + Assert.assertEquals(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table3").size(), 1); + Assert.assertTrue(_helixBrokerStarter.getLwmService().getLowWaterMarks("Table3").get(Integer.parseInt("0")) == 17L); + } + +// @Test + public void testLowWaterMarksMerge() { + Map<Integer, Long> table1Map = new HashMap<>(); + table1Map.put(0, 10L); + table1Map.put(1, 20L); + Map<Integer, Long> table2Map = new HashMap<>(); + table2Map.put(0, 11L); + Map<String, Map<Integer, Long>> currentLwmsMap = new ConcurrentHashMap<>(); + currentLwmsMap.put("Table1", table1Map); + currentLwmsMap.put("Table2", table2Map); + + Map<Integer, Long> newTable1Map = new HashMap<>(); + newTable1Map.put(0, 15L); + newTable1Map.put(1, 18L); + Map<Integer, Long> table3Map = new HashMap<>(); + table3Map.put(0, 17L); + Map<String, Map<Integer, Long>> serverLwms = new HashMap<>(); + serverLwms.put("Table1", newTable1Map); + serverLwms.put("Table3", table3Map); + + PollingBasedLowWaterMarkService.LwmMerger.updateLowWaterMarks(currentLwmsMap, serverLwms); + + Assert.assertEquals(currentLwmsMap.size(), 3); + + // Verify Table1 content. + Assert.assertTrue(currentLwmsMap.containsKey("Table1")); + Map<Integer, Long> lwmsMap1 = currentLwmsMap.get("Table1"); + Assert.assertEquals(lwmsMap1.size(), 2); + // Verify that the lower LWM value is chosen in the combined results. + Assert.assertTrue(lwmsMap1.get(0) == 10L); + Assert.assertTrue(lwmsMap1.get(1) == 18L); + + // Verify Table2 content. + Assert.assertTrue(currentLwmsMap.containsKey("Table2")); + Map<Integer, Long> lwmsMap2 = currentLwmsMap.get("Table2"); + Assert.assertEquals(lwmsMap2.size(), 1); + // Verify that the lower LWM value is chosen in the combined results. + Assert.assertTrue(lwmsMap2.get(0) == 11L); + + // Verify Table3 content. + Assert.assertTrue(currentLwmsMap.containsKey("Table3")); + Map<Integer, Long> lwmsMap3 = currentLwmsMap.get("Table3"); + Assert.assertEquals(lwmsMap3.size(), 1); + // Verify that the lower LWM value is chosen in the combined results. + Assert.assertTrue(lwmsMap3.get(0) == 17L); + } + + /** + * Register and connect to Helix cluster as PARTICIPANT role. + */ + private HelixManager registerAndConnectAsHelixParticipant(String helixClusterName, String instanceId, String helixZkURL) { + HelixManager helixManager = + HelixManagerFactory.getZKHelixManager(helixClusterName, instanceId, InstanceType.PARTICIPANT, helixZkURL); + + // Registers Master-Slave state model to state machine engine, which is for calculating participant assignment in lead controller resource. + helixManager.getStateMachineEngine() + .registerStateModelFactory(MasterSlaveSMD.name, new MasterSlaveStateModelFactory()); + + try { + helixManager.connect(); + return helixManager; + } catch (Exception e) { + String errorMsg = + String.format("Exception when connecting the instance %s as Participant to Helix.", instanceId); + throw new RuntimeException(errorMsg); + } + } + +} diff --git a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java index 979e5c1..e2e270d 100644 --- a/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java +++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/core/data/manager/upsert/UpsertImmutableIndexSegmentCallbackTest.java @@ -18,7 +18,7 @@ */ package org.apache.pinot.core.data.manager.upsert; -import org.apache.pinot.core.segment.updater.UpsertWatermarkManager; +import org.apache.pinot.core.segment.updater.UpsertWaterMarkManager; import org.apache.pinot.core.segment.virtualcolumn.mutable.VirtualColumnLongValueReaderWriter; import org.apache.pinot.grigio.common.messages.LogEventType; import org.apache.pinot.grigio.common.storageProvider.UpdateLogEntry; @@ -40,13 +40,13 @@ import static org.mockito.Mockito.when; public class UpsertImmutableIndexSegmentCallbackTest { UpdateLogStorageProvider _mockProvider; - UpsertWatermarkManager _mockUpsertWatermarkManager; + UpsertWaterMarkManager _mockUpsertWatermarkManager; List<VirtualColumnLongValueReaderWriter> _readerWriters = new ArrayList<>(); @BeforeMethod public void init() { _mockProvider = mock(UpdateLogStorageProvider.class); - _mockUpsertWatermarkManager = mock(UpsertWatermarkManager.class); + _mockUpsertWatermarkManager = mock(UpsertWaterMarkManager.class); } @Test diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java index 6a626cc..8f30ca8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java @@ -332,6 +332,8 @@ public class ClusterIntegrationTestUtils { properties.put("serializer.class", "kafka.serializer.DefaultEncoder"); properties.put("request.required.acks", "1"); properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner"); + properties.put("max.request.size", "300000000"); + properties.put("buffer.memory", "300000000"); StreamDataProducer producer = StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME, properties); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/LowWatermarksResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/LowWatermarksResource.java new file mode 100644 index 0000000..b346066 --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/LowWatermarksResource.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import org.apache.pinot.common.restlet.resources.ResourceUtils; +import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo; +import org.apache.pinot.core.segment.updater.WaterMarkManager; +import org.apache.pinot.server.starter.ServerInstance; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +@Api(tags = "LowWaterMarks") +@Path("/") +public class LowWatermarksResource { + + @Inject + ServerInstance serverInstance; + + @GET + @Produces(MediaType.APPLICATION_JSON) + @Path("/lwms") + @ApiOperation(value = "Show the lwms of tables ", notes = "Returns the lwms of all upsert enable tables in this server") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 500, message = "Internal server error"), + }) + public String getLowWaterMarks() { + WaterMarkManager watermarkManager = serverInstance.getWatermarkManager(); + + if (watermarkManager == null) { + throw new WebApplicationException("Invalid server initialization", Response.Status.INTERNAL_SERVER_ERROR); + } + return ResourceUtils.convertToJsonString( + new TableLowWaterMarksInfo(watermarkManager.getHighWaterMarkTablePartitionMap())); + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java new file mode 100644 index 0000000..13a7612 --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/UpsertDebugResource.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.api.resources; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.SegmentDataManager; +import org.apache.pinot.core.data.manager.TableDataManager; +import org.apache.pinot.core.data.manager.UpsertSegmentDataManager; +import org.apache.pinot.server.starter.ServerInstance; + +import javax.inject.Inject; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Api(tags = "UpsertDebug") +@Path("/") +public class UpsertDebugResource { + + @Inject + ServerInstance serverInstance; + + @GET + @Produces(MediaType.TEXT_PLAIN) + @Path("/upsert/{tableName}/{segmentName}/{offset}") + @ApiOperation(value = "$validFrom and $validUntil value", notes = "") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success", response = String.class), + @ApiResponse(code = 500, message = "Internal server error"), + }) + public String getUpsertDataAtOffset( + @ApiParam(value = "Table name including type", required = true, example = "myTable_REALTIME") @PathParam("tableName") String tableName, + @ApiParam(value = "segment name", required = true, example = "eats_supply_update__0__0__20190923T0700Z") @PathParam("segmentName") String segmentName, + @ApiParam(value = "offset", required = true, example = "100") @PathParam("offset") String offsetStr + ) { + if (!serverInstance.isUpsertEnabled()) { + return "not an upsert server"; + } + InstanceDataManager instanceDataManager = serverInstance.getInstanceDataManager(); + TableDataManager tableDataManager = instanceDataManager.getTableDataManager(tableName); + if (tableDataManager == null) { + return "no table for " + tableName; + } + SegmentDataManager segmentDataManager = null; + try { + segmentDataManager = tableDataManager.acquireSegment(segmentName); + if (segmentDataManager == null) { + return "cannot find associate segment for segment " + segmentName; + } + if (!(segmentDataManager instanceof UpsertSegmentDataManager)) { + return "it is not an upsert table"; + } else { + return ((UpsertSegmentDataManager) segmentDataManager).getVirtualColumnInfo(Long.parseLong(offsetStr)); + } + } finally { + if (segmentDataManager != null) { + tableDataManager.releaseSegment(segmentDataManager); + } + } + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java index 5610a4a..d2adcb3 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java @@ -20,8 +20,8 @@ package org.apache.pinot.server.conf; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; -import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.server.upsert.DefaultUpsertComponentContainer; /** @@ -45,6 +45,11 @@ public class ServerConf { private static final String PINOT_QUERY_SCHEDULER_PREFIX = "pinot.query.scheduler"; + public static final String UPSERT_CONFIG_PARENT = "pinot.server.upsert"; + public static final String UPSERT_COMPONENT_CONFIG_KEY = "pinot.server.upsertComponent.class"; + public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName(); + + private Configuration _serverConf; public ServerConf(Configuration serverConfig) { @@ -92,6 +97,14 @@ public class ServerConf { return _serverConf.subset(PINOT_QUERY_SCHEDULER_PREFIX); } + public String getUpsertComponentContainerClassName() { + return _serverConf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT); + } + + public Configuration getUpsertConfig() { + return _serverConf.subset(UPSERT_CONFIG_PARENT); + } + /** * Returns an array of transform function names as defined in the config * @return String array of transform functions diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java index 11e2326..845a90b 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java @@ -26,14 +26,18 @@ import java.util.concurrent.atomic.LongAccumulator; import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.MetricsHelper; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.core.segment.updater.SegmentDeletionHandler; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.scheduler.QueryScheduler; import org.apache.pinot.core.query.scheduler.QuerySchedulerFactory; +import org.apache.pinot.core.segment.updater.WaterMarkManager; import org.apache.pinot.core.transport.QueryServer; import org.apache.pinot.server.conf.ServerConf; +import org.apache.pinot.core.segment.updater.UpsertComponentContainer; +import org.apache.pinot.server.upsert.UpsertComponentContainerProvider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +49,7 @@ import org.slf4j.LoggerFactory; public class ServerInstance { private static final Logger LOGGER = LoggerFactory.getLogger(ServerInstance.class); + private final ServerConf _serverConf; private final ServerMetrics _serverMetrics; private final InstanceDataManager _instanceDataManager; private final QueryExecutor _queryExecutor; @@ -52,12 +57,16 @@ public class ServerInstance { private final QueryScheduler _queryScheduler; private final QueryServer _queryServer; + // upsert related component, only initialize if necessary + private UpsertComponentContainer _upsertComponentContainer; + private boolean _started = false; - public ServerInstance(ServerConf serverConf, HelixManager helixManager) + public ServerInstance(ServerConf serverConf, HelixManager helixManager, String clusterName, String instanceName) throws Exception { LOGGER.info("Initializing server instance"); + _serverConf = serverConf; LOGGER.info("Initializing server metrics"); MetricsHelper.initializeMetrics(serverConf.getMetricsConfig()); MetricsRegistry metricsRegistry = new MetricsRegistry(); @@ -97,12 +106,19 @@ public class ServerInstance { } TransformFunctionFactory.init(transformFunctionClasses); + + final UpsertComponentContainerProvider upsertComponentContainerProvider = new UpsertComponentContainerProvider(serverConf); + _upsertComponentContainer = upsertComponentContainerProvider.getInstance(); + _upsertComponentContainer.registerMetrics(_serverConf.getMetricsPrefix(), metricsRegistry); + _upsertComponentContainer.init(_serverConf.getUpsertConfig(), helixManager, clusterName, instanceName); + LOGGER.info("Finish initializing server instance"); } public synchronized void start() { // This method is called when Helix starts a new ZK session, and can be called multiple times. We only need to start // the server instance once, and simply ignore the following invocations. + LOGGER.info("Starting server instance"); if (_started) { LOGGER.info("Server instance is already running, skipping the start"); return; @@ -127,12 +143,16 @@ public class ServerInstance { Preconditions.checkState(_started, "Server instance is not running"); LOGGER.info("Shutting down server instance"); + _upsertComponentContainer.stopBackgroundThread(); LOGGER.info("Shutting down query server"); _queryServer.shutDown(); LOGGER.info("Shutting down query scheduler"); _queryScheduler.stop(); LOGGER.info("Shutting down query executor"); _queryExecutor.shutDown(); + LOGGER.info("Shutting down upsert components if necessary"); + _upsertComponentContainer.shutdown(); + LOGGER.info("Shutting down instance data manager"); _instanceDataManager.shutDown(); @@ -148,6 +168,23 @@ public class ServerInstance { return _instanceDataManager; } + public SegmentDeletionHandler getSegmentDeletionHandler() { + return _upsertComponentContainer.getSegmentDeletionHandler(); + } + + public WaterMarkManager getWatermarkManager() { + return _upsertComponentContainer.getWatermarkManager(); + } + + public void maybeStartUpsertBackgroundThread() { + LOGGER.info("starting upsert component background thread"); + _upsertComponentContainer.startBackgroundThread(); + } + + public boolean isUpsertEnabled() { + return _upsertComponentContainer.isUpsertEnabled(); + } + public long getLatestQueryTime() { return _latestQueryTime.get(); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 920b422..710e22a 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -307,13 +307,6 @@ public class HelixInstanceDataManager implements InstanceDataManager { } } - /* - @Override - public Map<String, Map<Integer, Long>> getLowWaterMarks() { - return UpsertWaterMarkManager.getInstance().getHighWaterMarkTablePartitionMap(); - } - */ - @Override public String getSegmentDataDirectory() { return _instanceDataManagerConfig.getInstanceDataDir(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java index c1ea182..402bb6b 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixServerStarter.java @@ -21,14 +21,6 @@ package org.apache.pinot.server.starter.helix; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationUtils; @@ -64,14 +56,57 @@ import org.apache.pinot.server.conf.ServerConf; import org.apache.pinot.server.realtime.ControllerLeaderLocator; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.server.starter.ServerInstance; -import org.apache.pinot.server.upsert.UpsertComponentContainer; -import org.apache.pinot.server.upsert.UpsertComponentContainerProvider; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.common.utils.CommonConstants.Helix.*; -import static org.apache.pinot.common.utils.CommonConstants.Server.*; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.pinot.common.utils.CommonConstants.Helix.CONFIG_OF_SERVER_FLAPPING_TIME_WINDOW_MS; +import static org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_FLAPPING_TIME_WINDOW_MS; +import static org.apache.pinot.common.utils.CommonConstants.Helix.DEFAULT_SERVER_NETTY_PORT; +import static org.apache.pinot.common.utils.CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME; +import static org.apache.pinot.common.utils.CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS; +import static org.apache.pinot.common.utils.CommonConstants.Helix.Instance; +import static org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_HOST; +import static org.apache.pinot.common.utils.CommonConstants.Helix.KEY_OF_SERVER_NETTY_PORT; +import static org.apache.pinot.common.utils.CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE; +import static org.apache.pinot.common.utils.CommonConstants.Helix.StateModel; +import static org.apache.pinot.common.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_ADMIN_API_PORT; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_ID; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_TIMEOUT; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SERVER_MIN_RESOURCE_PERCENT_FOR_START; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_QUERY_CHECK; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_ENABLE_RESOURCE_CHECK; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_NO_QUERY_THRESHOLD_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_SHUTDOWN_TIMEOUT_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_ENABLE_SERVICE_STATUS_CHECK; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.CONFIG_OF_STARTUP_TIMEOUT_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SERVER_MIN_RESOURCE_PERCENT_FOR_START; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_QUERY_CHECK; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_ENABLE_RESOURCE_CHECK; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_RESOURCE_CHECK_INTERVAL_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_SHUTDOWN_TIMEOUT_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_ENABLE_SERVICE_STATUS_CHECK; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_REALTIME_CONSUMPTION_CATCHUP_WAIT_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_SERVICE_STATUS_CHECK_INTERVAL_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.DEFAULT_STARTUP_TIMEOUT_MS; +import static org.apache.pinot.common.utils.CommonConstants.Server.SegmentCompletionProtocol; /** @@ -138,11 +173,11 @@ public class HelixServerStarter { ServerSegmentCompletionProtocolHandler .init(_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER)); ServerConf serverInstanceConfig = DefaultHelixStarterServerConfig.getDefaultHelixServerConfig(_serverConf); - _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager); + _serverInstance = new ServerInstance(serverInstanceConfig, _helixManager, _helixClusterName, _instanceId); InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); SegmentFetcherAndLoader fetcherAndLoader = new SegmentFetcherAndLoader(_serverConf, instanceDataManager); - StateModelFactory<?> stateModelFactory = - new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, fetcherAndLoader); + StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager, + fetcherAndLoader, _serverInstance.getSegmentDeletionHandler()); _helixManager.getStateMachineEngine() .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory); // Start the server instance as a pre-connect callback so that it starts after connecting to the ZK in order to @@ -181,6 +216,8 @@ public class HelixServerStarter { long endTimeMs = startTimeMs + _serverConf.getLong(CONFIG_OF_STARTUP_TIMEOUT_MS, DEFAULT_STARTUP_TIMEOUT_MS); startupServiceStatusCheck(endTimeMs); } + _serverInstance.maybeStartUpsertBackgroundThread(); + setShuttingDownStatus(false); LOGGER.info("Pinot server ready"); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index 145e378..1eab7f5 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -19,8 +19,6 @@ package org.apache.pinot.server.starter.helix; import com.google.common.base.Preconditions; -import java.io.File; -import java.util.concurrent.locks.Lock; import org.apache.commons.io.FileUtils; import org.apache.helix.NotificationContext; import org.apache.helix.model.Message; @@ -39,10 +37,13 @@ import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.SegmentDataManager; import org.apache.pinot.core.data.manager.TableDataManager; import org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager; -import org.apache.pinot.server.upsert.SegmentDeletionHandler; +import org.apache.pinot.core.segment.updater.SegmentDeletionHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.concurrent.locks.Lock; + /** * Data Server layer state model to take over how to operate on: @@ -56,17 +57,18 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta private final SegmentFetcherAndLoader _fetcherAndLoader; private final SegmentDeletionHandler _segmentDeletionHandler; + public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager, SegmentFetcherAndLoader fetcherAndLoader) { - this(instanceId, instanceDataManager, fetcherAndLoader, new SegmentDeletionHandler()); + this(instanceId, instanceDataManager, fetcherAndLoader, new SegmentDeletionHandler()); } public SegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager, - SegmentFetcherAndLoader fetcherAndLoader, SegmentDeletionHandler segmentDeletionHandler) { + SegmentFetcherAndLoader fetcherAndLoader, SegmentDeletionHandler deletionHandler) { _instanceId = instanceId; _instanceDataManager = instanceDataManager; _fetcherAndLoader = fetcherAndLoader; - _segmentDeletionHandler = segmentDeletionHandler; + _segmentDeletionHandler = deletionHandler; } public static String getStateModelName() { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java index 63424ac..3841a2e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/DefaultUpsertComponentContainer.java @@ -18,20 +18,25 @@ */ package org.apache.pinot.server.upsert; -import com.codahale.metrics.MetricRegistry; +import com.yammer.metrics.core.MetricsRegistry; import org.apache.commons.configuration.Configuration; +import org.apache.helix.HelixManager; +import org.apache.pinot.core.segment.updater.DefaultWaterMarkManager; +import org.apache.pinot.core.segment.updater.SegmentDeletionHandler; +import org.apache.pinot.core.segment.updater.UpsertComponentContainer; +import org.apache.pinot.core.segment.updater.WaterMarkManager; public class DefaultUpsertComponentContainer implements UpsertComponentContainer { private final SegmentDeletionHandler deletionHandler = new SegmentDeletionHandler(); + private final WaterMarkManager watermarkManager = new DefaultWaterMarkManager(); @Override - public void registerMetrics(MetricRegistry registry) { - + public void registerMetrics(String prefix, MetricsRegistry registry) { } @Override - public void init(Configuration config) { + public void init(Configuration config, HelixManager helixManager, String clusterName, String instanceName) { } @Override @@ -40,10 +45,29 @@ public class DefaultUpsertComponentContainer implements UpsertComponentContainer } @Override - public void start() { + public WaterMarkManager getWatermarkManager() { + return watermarkManager; + } + + @Override + public boolean isUpsertEnabled() { + return false; } @Override - public void stop() { + public void startBackgroundThread() { + } + + @Override + public void stopBackgroundThread() { + + } + + @Override + public void shutdown() { + + } + + } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java index fe4af07..7ae27a6 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/upsert/UpsertComponentContainerProvider.java @@ -19,10 +19,10 @@ package org.apache.pinot.server.upsert; import com.google.common.base.Preconditions; -import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.pinot.core.segment.updater.WatermarkManager; -import org.apache.pinot.grigio.common.metrics.GrigioMetrics; +import org.apache.pinot.core.segment.updater.UpsertComponentContainer; +import org.apache.pinot.core.segment.updater.WaterMarkManager; +import org.apache.pinot.server.conf.ServerConf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,22 +30,16 @@ public class UpsertComponentContainerProvider { private static final Logger LOGGER = LoggerFactory.getLogger(UpsertComponentContainerProvider.class); - public static final String UPSERT_COMPONENT_CONFIG_KEY = "watermarkManager.class"; - public static final String UPSERT_COMPONENT_CONFIG_DEFAULT = DefaultUpsertComponentContainer.class.getName(); - - private final Configuration _conf; private UpsertComponentContainer _instance; - public UpsertComponentContainerProvider(Configuration conf, GrigioMetrics metrics) { - _conf = conf; - String className = _conf.getString(UPSERT_COMPONENT_CONFIG_KEY, UPSERT_COMPONENT_CONFIG_DEFAULT); + public UpsertComponentContainerProvider(ServerConf serverConf) { + String className = serverConf.getUpsertComponentContainerClassName(); LOGGER.info("creating watermark manager with class {}", className); try { Class<UpsertComponentContainer> comonentContainerClass = (Class<UpsertComponentContainer>) Class.forName(className); - Preconditions.checkState(comonentContainerClass.isAssignableFrom(WatermarkManager.class), + Preconditions.checkState(comonentContainerClass.isAssignableFrom(WaterMarkManager.class), "configured class not assignable from Callback class"); _instance = comonentContainerClass.newInstance(); - _instance.init(_conf); } catch (Exception e) { LOGGER.error("failed to load watermark manager class", className, e); ExceptionUtils.rethrow(e); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java b/pinot-server/src/test/java/org/apache/pinot/server/api/resources/LowWatermarksResourceTest.java similarity index 60% rename from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java rename to pinot-server/src/test/java/org/apache/pinot/server/api/resources/LowWatermarksResourceTest.java index 01579e7..b007c90 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/WatermarkManager.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/resources/LowWatermarksResourceTest.java @@ -1,3 +1,4 @@ + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -16,18 +17,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.segment.updater; - -import org.apache.commons.configuration.Configuration; -import org.apache.pinot.grigio.common.metrics.GrigioMeter; -import org.apache.pinot.grigio.common.metrics.GrigioMetrics; - -import java.util.Map; - -public interface WatermarkManager { +package org.apache.pinot.server.api.resources; - void init(Configuration config, GrigioMetrics metrics); +import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo; +import org.apache.pinot.server.api.BaseResourceTest; +import org.testng.Assert; +import org.testng.annotations.Test; - Map<String, Map<Integer, Long>> getHighWaterMarkTablePartitionMap(); +public class LowWatermarksResourceTest extends BaseResourceTest { + @Test + public void testHappyPath() { + TableLowWaterMarksInfo lwms = _webTarget.path("lwms").request().get(TableLowWaterMarksInfo.class); + Assert.assertEquals(lwms.getTableLowWaterMarks().size(), 0); + } } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java index ac989bb..65cb9e7 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/realtime/provisioning/MemoryEstimator.java @@ -18,11 +18,6 @@ */ package org.apache.pinot.tools.realtime.provisioning; -import java.io.File; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.config.TableConfig; @@ -40,6 +35,12 @@ import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.core.segment.index.SegmentMetadataImpl; +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + /** * Given a sample segment, this class can estimate how much memory would be used per host, for various combinations of numHostsToProvision and numHoursToConsume @@ -126,7 +127,8 @@ public class MemoryEstimator { // create a config RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = - new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName()) + new RealtimeSegmentConfig.Builder().setTableName(_tableConfig.getTableName()) + .setSegmentName(_segmentMetadata.getName()) .setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema()) .setCapacity(_segmentMetadata.getTotalDocs()).setAvgNumMultiValues(_avgMultiValues) .setNoDictionaryColumns(_noDictionaryColumns) @@ -228,7 +230,8 @@ public class MemoryEstimator { RealtimeSegmentZKMetadata segmentZKMetadata = getRealtimeSegmentZKMetadata(_segmentMetadata, totalDocs); RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder = - new RealtimeSegmentConfig.Builder().setSegmentName(_segmentMetadata.getName()) + new RealtimeSegmentConfig.Builder().setTableName(_tableConfig.getTableName()) + .setSegmentName(_segmentMetadata.getName()) .setStreamName(_tableNameWithType).setSchema(_segmentMetadata.getSchema()) .setCapacity(totalDocs).setAvgNumMultiValues(_avgMultiValues) .setNoDictionaryColumns(_noDictionaryColumns) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
