This is an automated email from the ASF dual-hosted git repository. jamesshao pushed a commit to branch upsert-refactor in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 18a2da4457a75ee34cbb3bb854371706e55f1b51 Author: james Shao <[email protected]> AuthorDate: Thu Mar 19 10:45:11 2020 -0700 more refactor --- .../requesthandler/BaseBrokerRequestHandler.java | 14 +-- .../requesthandler/LowWaterMarkQueryWriter.java | 109 --------------------- .../broker/upsert/DefaultLowWaterMarkService.java | 8 ++ ...ervice.java => DefaultUpsertQueryRewriter.java} | 33 ++----- .../apache/pinot/common/utils/CommonConstants.java | 2 +- .../core/segment/updater/LowWaterMarkService.java | 2 + ...erMarkService.java => UpsertQueryRewriter.java} | 27 ++--- .../upsert/PollingBasedLowWaterMarkService.java | 8 ++ .../broker/upsert/UpsertQueryRewriterImpl.java | 42 ++++++-- .../broker/upsert/UpsertQueryRewriterImplTest.java | 21 +++- 10 files changed, 83 insertions(+), 183 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 1d8f97b..cb17740 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -300,7 +300,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { if (shouldEnableLowWaterMarkRewrite(request)) { // Augment the realtime request with LowWaterMark constraints. - addLowWaterMarkToQuery(realtimeBrokerRequest, rawTableName); + _lwmService.getQueryRewriter().rewriteQueryForUpsert(realtimeBrokerRequest, rawTableName); } // Calculate routing table for the query @@ -786,18 +786,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } } - private void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, String rawTableName) { - final String realtimeTableName = rawTableName + "_REALTIME"; - Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName); - if (lowWaterMarks == null || lowWaterMarks.size() == 0) { - LOGGER.info("No low water marks info found for table {}", realtimeTableName); - return; - } - LOGGER.info("Found low water marks {} for table {}", String.valueOf(lowWaterMarks), realtimeTableName); - LowWaterMarkQueryWriter.addLowWaterMarkToQuery(realtimeBrokerRequest, lowWaterMarks); - LOGGER.info("Query augmented with LWMS info for table {} : {}", realtimeTableName, realtimeBrokerRequest); - } - /** * Processes the optimized broker requests for both OFFLINE and REALTIME table. */ diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java index c3285e5..b373243 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java @@ -39,113 +39,4 @@ public class LowWaterMarkQueryWriter { // Normal Pinot query node uses positive IDs. So lwm query node ids are all negative. private static final int QUERY_ID_BASE = -1000; - /** - * For upsert enabled tables, augment the realtime query with low water mark constraints in its filter query of the - * form - * ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1) - * - * @param realtimeBrokerRequest - * @param lowWaterMarks - */ - public static void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) { - if (lowWaterMarks == null || lowWaterMarks.size() == 0) { - LOGGER.warn("No low water mark info found for query: {}", realtimeBrokerRequest); - return; - } - - // Choose the min lwm among all partitions. - long minLwm = Collections.min(lowWaterMarks.values()); - - // 1. Build the low water mark query of the form for a table assuming lwm is the min LWM and -1 is used as - // uninitialized marker. - // ($validFrom <= lwm and $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1) - // -1 is used instead of Long.MAXVALUE because Pinot does not handle long arithmetic correctly. - FilterQuery lwmQuery = addSinglePartitionLowWaterMark(QUERY_ID_BASE - 1, realtimeBrokerRequest, minLwm); - - // 2. Attach low water mark filter to the current filters. - FilterQuery currentFilterQuery = realtimeBrokerRequest.getFilterQuery(); - if (currentFilterQuery != null) { - // Make an AND query of lwmQuery and the existing query. - FilterQuery andFilterQuery = new FilterQuery(); - // Make sure we do not reuse any query id in lwmQuerys. - andFilterQuery.setId(QUERY_ID_BASE); - andFilterQuery.setOperator(FilterOperator.AND); - List<Integer> nestedFilterQueryIds = new ArrayList<>(2); - nestedFilterQueryIds.add(currentFilterQuery.getId()); - nestedFilterQueryIds.add(lwmQuery.getId()); - andFilterQuery.setNestedFilterQueryIds(nestedFilterQueryIds); - - realtimeBrokerRequest.setFilterQuery(andFilterQuery); - FilterQueryMap filterSubQueryMap = realtimeBrokerRequest.getFilterSubQueryMap(); - filterSubQueryMap.putToFilterQueryMap(lwmQuery.getId(), lwmQuery); - filterSubQueryMap.putToFilterQueryMap(andFilterQuery.getId(), andFilterQuery); - } else { - realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery); - realtimeBrokerRequest.setFilterQuery(lwmQuery); - } - } - - /** - * - * @param queryIdBase The starting id that will be assigned to the first query created in ths method. - * @param realtimeBrokerRequest - * @param lwm low water mark. - * @return a filter query corresponding to the low water mark constraint of a single partition. The general form is: - * ($ValidFrom <= lwm && $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1) - */ - private static FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest, - Long lwm) { - // ValidFromQuery: ($ValidFrom <= lwm && $validFrom > -1) - FilterQuery validFromFilterQuery = new FilterQuery(); - // Important: Always decrement queryIdBase value after use to avoid id conflict. - validFromFilterQuery.setId(queryIdBase--); - validFromFilterQuery.setOperator(FilterOperator.AND); - FilterQuery validFromP1 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(*\t\t" + lwm + "]", FilterOperator.RANGE, realtimeBrokerRequest); - FilterQuery validFromP2 = getLeafFilterQuery(VALID_FROM, queryIdBase--, "(-1\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest); - List<Integer> nestedQueriesIdForValidFrom = new ArrayList<>(); - nestedQueriesIdForValidFrom.add(validFromP1.getId()); - nestedQueriesIdForValidFrom.add(validFromP2.getId()); - validFromFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidFrom); - - // ValidUtilQuery: (lwm < $validUtil OR $validUtil = -1) - FilterQuery validUtilFilterQuery = new FilterQuery(); - validUtilFilterQuery.setId(queryIdBase--); - validUtilFilterQuery.setOperator(FilterOperator.OR); - - FilterQuery validUtilP1 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "(" + lwm + "\t\t*)", FilterOperator.RANGE, realtimeBrokerRequest); - FilterQuery validUtilP2 = getLeafFilterQuery(VALID_UNTIL, queryIdBase--, "-1", FilterOperator.EQUALITY, realtimeBrokerRequest); - List<Integer> nestedQueriesIdForValidUtil = new ArrayList<>(); - nestedQueriesIdForValidUtil.add(validUtilP1.getId()); - nestedQueriesIdForValidUtil.add(validUtilP2.getId()); - validUtilFilterQuery.setNestedFilterQueryIds(nestedQueriesIdForValidUtil); - - // Top level query: ValidFromQuery AND ValidUtilQuery - FilterQuery lwmQuery = new FilterQuery(); - lwmQuery.setId(queryIdBase--); - lwmQuery.setOperator(FilterOperator.AND); - List<Integer> nestQids = new ArrayList<>(); - nestQids.add(validFromFilterQuery.getId()); - nestQids.add(validUtilFilterQuery.getId()); - lwmQuery.setNestedFilterQueryIds(nestQids); - - // Add all the new created queries to the query map. - realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(lwmQuery.getId(), lwmQuery); - realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validFromFilterQuery.getId(), validFromFilterQuery); - realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(validUtilFilterQuery.getId(), validUtilFilterQuery); - return lwmQuery; - } - - private static FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator, - BrokerRequest realtimeBrokerRequest) { - FilterQuery filterQuery = new FilterQuery(); - filterQuery.setColumn(column); - filterQuery.setId(id); - filterQuery.setValue(Collections.singletonList(value)); - filterQuery.setOperator(operator); - if (realtimeBrokerRequest.getFilterSubQueryMap() == null) { - realtimeBrokerRequest.setFilterSubQueryMap(new FilterQueryMap()); - } - realtimeBrokerRequest.getFilterSubQueryMap().putToFilterQueryMap(id, filterQuery); - return filterQuery; - } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java index 94b3be4..42e1dcb 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java @@ -22,11 +22,14 @@ import com.google.common.collect.ImmutableMap; import org.apache.helix.HelixDataAccessor; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.core.segment.updater.LowWaterMarkService; +import org.apache.pinot.core.segment.updater.UpsertQueryRewriter; import java.util.Map; public class DefaultLowWaterMarkService implements LowWaterMarkService { + private UpsertQueryRewriter upsertQueryRewriter = new DefaultUpsertQueryRewriter(); + @Override public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort){ @@ -44,4 +47,9 @@ public class DefaultLowWaterMarkService implements LowWaterMarkService { @Override public void start(BrokerMetrics brokerMetrics) { } + + @Override + public UpsertQueryRewriter getQueryRewriter() { + return upsertQueryRewriter; + } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java similarity index 56% copy from pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java copy to pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java index 94b3be4..d18a56f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultLowWaterMarkService.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/upsert/DefaultUpsertQueryRewriter.java @@ -1,3 +1,8 @@ +package org.apache.pinot.broker.upsert; + +import org.apache.pinot.common.request.BrokerRequest; +import org.apache.pinot.core.segment.updater.UpsertQueryRewriter; + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -16,32 +21,10 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.broker.upsert; - -import com.google.common.collect.ImmutableMap; -import org.apache.helix.HelixDataAccessor; -import org.apache.pinot.common.metrics.BrokerMetrics; -import org.apache.pinot.core.segment.updater.LowWaterMarkService; - -import java.util.Map; - -public class DefaultLowWaterMarkService implements LowWaterMarkService { - - @Override - public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, - int serverPort){ - } - - @Override - public Map<Integer, Long> getLowWaterMarks(String tableName) { - return ImmutableMap.of(); - } - - @Override - public void shutDown() { - } +public class DefaultUpsertQueryRewriter implements UpsertQueryRewriter { @Override - public void start(BrokerMetrics brokerMetrics) { + public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) { + // do nothing } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java index 0e65779..f5eeb15 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java @@ -179,7 +179,7 @@ public class CommonConstants { public static final String CONFIG_OF_BROKER_POLLING_SERVER_LWMS_SERVER_PORT = "pinot.broker.query.polling.server.lwms.port"; public static final String CONFIG_OF_BROKER_LWMS_CLASS_NAME = "pinot.broker.lwms.classname"; public static final String CONFIG_OF_BROKER_LWM_REWRITE_ENABLE = "pinot.broker.query.lwm.rewrite"; - public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = true; + public static final boolean CONFIG_OF_BROKER_LWM_REWRITE_ENABLE_DEFAULT = false; public static class Request { public static final String PQL = "pql"; public static final String SQL = "sql"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java index 2fd7434..0c6756c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java @@ -39,4 +39,6 @@ public interface LowWaterMarkService { // start void start(BrokerMetrics brokerMetrics); + + UpsertQueryRewriter getQueryRewriter(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java similarity index 54% copy from pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java copy to pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java index 2fd7434..64a64d0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/LowWaterMarkService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/updater/UpsertQueryRewriter.java @@ -1,3 +1,7 @@ +package org.apache.pinot.core.segment.updater; + +import org.apache.pinot.common.request.BrokerRequest; + /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -16,27 +20,8 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.segment.updater; - -import org.apache.helix.HelixDataAccessor; -import org.apache.pinot.common.metrics.BrokerMetrics; - -import java.util.Map; - -/** - * LowWaterMarkService keeps records of the low water mark (i.e., the stream ingestion progress) for each partition of - * an input table. - */ -public interface LowWaterMarkService { - - void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort); - - // Return the low water mark mapping from partition id to the corresponding low water mark of a given table. - Map<Integer, Long> getLowWaterMarks(String tableName); +public interface UpsertQueryRewriter { - // Shutdown the service. - void shutDown(); + void rewriteQueryForUpsert(BrokerRequest request, String rawTableName); - // start - void start(BrokerMetrics brokerMetrics); } diff --git a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java index f7e88c9..74e4b41 100644 --- a/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/PollingBasedLowWaterMarkService.java @@ -30,6 +30,7 @@ import org.apache.pinot.common.metrics.BrokerMeter; import org.apache.pinot.common.metrics.BrokerMetrics; import org.apache.pinot.common.restlet.resources.TableLowWaterMarksInfo; import org.apache.pinot.core.segment.updater.LowWaterMarkService; +import org.apache.pinot.core.segment.updater.UpsertQueryRewriter; import org.glassfish.jersey.client.ClientProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService { private int _serverPort; private boolean _shuttingDown; private BrokerMetrics _brokerMetrics; + private UpsertQueryRewriter _queryRewriter; @Override public void init(HelixDataAccessor helixDataAccessor, String helixClusterName, int serverPollingInterval, int serverPort) { @@ -75,6 +77,7 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService { _cacheInstanceConfigsDataAccessor = new ZkCacheBaseDataAccessor<>((ZkBaseDataAccessor<ZNRecord>) helixDataAccessor.getBaseDataAccessor(), instanceConfigs, null, Collections.singletonList(instanceConfigs)); + _queryRewriter = new UpsertQueryRewriterImpl(this); _tableLowWaterMarks = new ConcurrentHashMap<>(); _httpClient = ClientBuilder.newClient(); _httpClient.property(ClientProperties.CONNECT_TIMEOUT, SERVER_CONNENCT_TIMEOUT_MS); @@ -101,6 +104,11 @@ public class PollingBasedLowWaterMarkService implements LowWaterMarkService { } @Override + public UpsertQueryRewriter getQueryRewriter() { + return _queryRewriter; + } + + @Override public Map<Integer, Long> getLowWaterMarks(String tableName) { return _tableLowWaterMarks.get(tableName); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java similarity index 80% copy from pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java copy to pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java index c3285e5..90fce7c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriter.java +++ b/pinot-grigio/pinot-grigio-provided/src/main/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImpl.java @@ -16,29 +16,51 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.broker.requesthandler; +package org.apache.pinot.broker.upsert; +import org.apache.pinot.common.config.TableNameBuilder; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.FilterOperator; import org.apache.pinot.common.request.FilterQuery; import org.apache.pinot.common.request.FilterQueryMap; +import org.apache.pinot.common.utils.CommonConstants; +import org.apache.pinot.core.segment.updater.LowWaterMarkService; +import org.apache.pinot.core.segment.updater.UpsertQueryRewriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -// Add a lwm query to the filter query of a Pinot query for upsert enabled table. -// Thread-Safe -public class LowWaterMarkQueryWriter { - private static final Logger LOGGER = LoggerFactory.getLogger(LowWaterMarkQueryWriter.class); +public class UpsertQueryRewriterImpl implements UpsertQueryRewriter { + private static final Logger LOGGER = LoggerFactory.getLogger(UpsertQueryRewriterImpl.class); + + protected final LowWaterMarkService _lwmService; private static final String VALID_FROM = "$validFrom"; private static final String VALID_UNTIL = "$validUntil"; // Normal Pinot query node uses positive IDs. So lwm query node ids are all negative. private static final int QUERY_ID_BASE = -1000; + public UpsertQueryRewriterImpl(LowWaterMarkService lwmService) { + _lwmService = lwmService; + } + + @Override + public void rewriteQueryForUpsert(BrokerRequest request, String rawTableName) { + final String realtimeTableName = TableNameBuilder.ensureTableNameWithType(rawTableName, + CommonConstants.Helix.TableType.REALTIME); + Map<Integer, Long> lowWaterMarks = _lwmService.getLowWaterMarks(realtimeTableName); + if (lowWaterMarks == null || lowWaterMarks.size() == 0) { + LOGGER.info("No low water marks info found for table {}", realtimeTableName); + return; + } + LOGGER.info("Found low water marks {} for table {}", String.valueOf(lowWaterMarks), realtimeTableName); + addLowWaterMarkToQuery(request, lowWaterMarks); + LOGGER.info("Query augmented with LWMS info for table {} : {}", realtimeTableName, request); + } + /** * For upsert enabled tables, augment the realtime query with low water mark constraints in its filter query of the * form @@ -47,7 +69,7 @@ public class LowWaterMarkQueryWriter { * @param realtimeBrokerRequest * @param lowWaterMarks */ - public static void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) { + public void addLowWaterMarkToQuery(BrokerRequest realtimeBrokerRequest, Map<Integer, Long> lowWaterMarks) { if (lowWaterMarks == null || lowWaterMarks.size() == 0) { LOGGER.warn("No low water mark info found for query: {}", realtimeBrokerRequest); return; @@ -93,7 +115,7 @@ public class LowWaterMarkQueryWriter { * @return a filter query corresponding to the low water mark constraint of a single partition. The general form is: * ($ValidFrom <= lwm && $validFrom > -1) AND (lwm < $validUtil OR $validUtil = -1) */ - private static FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest, + private FilterQuery addSinglePartitionLowWaterMark(int queryIdBase, BrokerRequest realtimeBrokerRequest, Long lwm) { // ValidFromQuery: ($ValidFrom <= lwm && $validFrom > -1) FilterQuery validFromFilterQuery = new FilterQuery(); @@ -135,7 +157,7 @@ public class LowWaterMarkQueryWriter { return lwmQuery; } - private static FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator, + private FilterQuery getLeafFilterQuery(String column, int id, String value, FilterOperator operator, BrokerRequest realtimeBrokerRequest) { FilterQuery filterQuery = new FilterQuery(); filterQuery.setColumn(column); diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java similarity index 91% rename from pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java rename to pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java index 1021477..ac6c38d 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LowWaterMarkQueryWriterTest.java +++ b/pinot-grigio/pinot-grigio-provided/src/test/java/org/apache/pinot/broker/upsert/UpsertQueryRewriterImplTest.java @@ -16,21 +16,34 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.broker.requesthandler; +package org.apache.pinot.broker.upsert; +import org.apache.pinot.broker.requesthandler.LowWaterMarkQueryWriter; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.FilterOperator; import org.apache.pinot.common.request.FilterQuery; +import org.apache.pinot.core.segment.updater.LowWaterMarkService; import org.apache.pinot.pql.parsers.Pql2Compiler; import org.apache.thrift.TException; import org.testng.Assert; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.util.Collections; import java.util.HashMap; import java.util.Map; -public class LowWaterMarkQueryWriterTest { +public class UpsertQueryRewriterImplTest { + + private LowWaterMarkService _lwms; + private UpsertQueryRewriterImpl rewriter; + + @BeforeClass + public void init() { + _lwms = new PollingBasedLowWaterMarkService(); + rewriter = new UpsertQueryRewriterImpl(_lwms); + } + @Test public void testRewriteQueryWithoutExistingFilters() throws Exception{ Pql2Compiler pql2Compiler = new Pql2Compiler(); @@ -39,7 +52,7 @@ public class LowWaterMarkQueryWriterTest { Map<Integer, Long> lwms = new HashMap<>(); lwms.put(0, 10L); lwms.put(1, 20L); - LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms); + rewriter.addLowWaterMarkToQuery(req, lwms); Assert.assertTrue(req.isSetFilterQuery()); try { req.validate(); @@ -80,7 +93,7 @@ public class LowWaterMarkQueryWriterTest { Map<Integer, Long> lwms = new HashMap<>(); lwms.put(0, 10L); lwms.put(1, 20L); - LowWaterMarkQueryWriter.addLowWaterMarkToQuery(req, lwms); + rewriter.addLowWaterMarkToQuery(req, lwms); Assert.assertTrue(req.isSetFilterQuery()); try { req.validate(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
