This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ada54f1 [QueryContext] Refactor PlanMaker to use QueryContext (#5568)
ada54f1 is described below
commit ada54f11ac2515340cfc5c7f8ebcf66ec3069be3
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jun 17 16:03:52 2020 -0700
[QueryContext] Refactor PlanMaker to use QueryContext (#5568)
Also addressed the TODO to directly use List<IndexSegment> instead of
List<SegmentDataManager> for the instance level Plan
Modified and simplied tests to apply the interface change
---
.../core/plan/maker/InstancePlanMakerImplV2.java | 153 ++++-----
.../apache/pinot/core/plan/maker/PlanMaker.java | 27 +-
.../query/executor/ServerQueryExecutorV1Impl.java | 10 +-
...adataAndDictionaryAggregationPlanMakerTest.java | 30 +-
.../pinot/queries/BaseMultiValueQueriesTest.java | 22 +-
.../org/apache/pinot/queries/BaseQueriesTest.java | 188 +++++------
.../pinot/queries/BaseSingleValueQueriesTest.java | 25 +-
.../queries/DistinctCountThetaSketchTest.java | 39 +--
.../apache/pinot/queries/DistinctQueriesTest.java | 42 ++-
.../apache/pinot/queries/FastHllQueriesTest.java | 11 +-
.../queries/PercentileTDigestQueriesTest.java | 32 +-
.../RangePredicateWithSortedInvertedIndexTest.java | 249 +++++++-------
.../queries/SelectionOnlyEarlyTerminationTest.java | 6 +-
.../pinot/queries/SerializedBytesQueriesTest.java | 28 +-
.../pinot/queries/TextSearchQueriesTest.java | 115 +++----
.../apache/pinot/queries/TransformQueriesTest.java | 359 ++++++++-------------
16 files changed, 561 insertions(+), 775 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 2103115..4b388a3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -23,10 +23,7 @@ import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.request.AggregationInfo;
import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode;
import org.apache.pinot.core.plan.AggregationGroupByPlanNode;
@@ -39,8 +36,11 @@ import
org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.core.plan.SelectionPlanNode;
-import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.config.QueryExecutorConfig;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
import org.apache.pinot.core.segment.index.readers.Dictionary;
import org.apache.pinot.core.util.QueryOptions;
import org.slf4j.Logger;
@@ -95,10 +95,26 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
}
@Override
- public PlanNode makeInnerSegmentPlan(IndexSegment indexSegment,
BrokerRequest brokerRequest) {
- if (brokerRequest.isSetAggregationsInfo()) {
- if (brokerRequest.isSetGroupBy()) {
- QueryOptions queryOptions = new
QueryOptions(brokerRequest.getQueryOptions());
+ public Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext
queryContext,
+ ExecutorService executorService, long timeOutMs) {
+ List<PlanNode> planNodes = new ArrayList<>(indexSegments.size());
+ for (IndexSegment indexSegment : indexSegments) {
+ planNodes.add(makeSegmentPlanNode(indexSegment, queryContext));
+ }
+ CombinePlanNode combinePlanNode =
+ new CombinePlanNode(planNodes, queryContext.getBrokerRequest(),
executorService, timeOutMs, _numGroupsLimit);
+ return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
+ }
+
+ @Override
+ public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext
queryContext) {
+ BrokerRequest brokerRequest = queryContext.getBrokerRequest();
+ if (QueryContextUtils.isAggregationQuery(queryContext)) {
+ // Aggregation query
+ List<ExpressionContext> groupByExpressions =
queryContext.getGroupByExpressions();
+ if (groupByExpressions != null) {
+ // Aggregation group-by query
+ QueryOptions queryOptions = new
QueryOptions(queryContext.getQueryOptions());
// new Combine operator only when GROUP_BY_MODE explicitly set to SQL
if (queryOptions.isGroupByModeSQL()) {
return new AggregationGroupByOrderByPlanNode(indexSegment,
brokerRequest, _maxInitialResultHolderCapacity,
@@ -107,109 +123,62 @@ public class InstancePlanMakerImplV2 implements
PlanMaker {
return new AggregationGroupByPlanNode(indexSegment, brokerRequest,
_maxInitialResultHolderCapacity,
_numGroupsLimit);
} else {
- if (isFitForMetadataBasedPlan(brokerRequest, indexSegment)) {
- return new MetadataBasedAggregationPlanNode(indexSegment,
brokerRequest);
- } else if (isFitForDictionaryBasedPlan(brokerRequest, indexSegment)) {
- return new DictionaryBasedAggregationPlanNode(indexSegment,
brokerRequest);
- } else {
- return new AggregationPlanNode(indexSegment, brokerRequest);
+ // Aggregation only query
+ if (queryContext.getFilter() == null) {
+ if (isFitForMetadataBasedPlan(queryContext)) {
+ return new MetadataBasedAggregationPlanNode(indexSegment,
brokerRequest);
+ } else if (isFitForDictionaryBasedPlan(queryContext, indexSegment)) {
+ return new DictionaryBasedAggregationPlanNode(indexSegment,
brokerRequest);
+ }
}
+ return new AggregationPlanNode(indexSegment, brokerRequest);
}
- }
- if (brokerRequest.isSetSelections()) {
+ } else {
+ // Selection query
return new SelectionPlanNode(indexSegment, brokerRequest);
}
- throw new UnsupportedOperationException("The query contains no aggregation
or selection.");
- }
-
- @Override
- public Plan makeInterSegmentPlan(List<SegmentDataManager>
segmentDataManagers, BrokerRequest brokerRequest,
- ExecutorService executorService, long timeOutMs) {
- // TODO: pass in List<IndexSegment> directly.
- List<IndexSegment> indexSegments = new
ArrayList<>(segmentDataManagers.size());
- for (SegmentDataManager segmentDataManager : segmentDataManagers) {
- indexSegments.add(segmentDataManager.getSegment());
- }
-
- List<PlanNode> planNodes = new ArrayList<>();
- for (IndexSegment indexSegment : indexSegments) {
- planNodes.add(makeInnerSegmentPlan(indexSegment, brokerRequest));
- }
- CombinePlanNode combinePlanNode =
- new CombinePlanNode(planNodes, brokerRequest, executorService,
timeOutMs, _numGroupsLimit);
-
- return new GlobalPlanImplV0(new InstanceResponsePlanNode(combinePlanNode));
}
/**
- * Helper method to identify if query is fit to be be served purely based on
metadata.
- * Currently count queries without any filters are supported.
- * The code for supporting max and min is also in place, but disabled
- * It would have worked only for time columns and offline and non star tree
cases.
- *
- * @param brokerRequest Broker request
- * @param indexSegment
- * @return True if query can be served using metadata, false otherwise.
+ * Returns {@code true} if the given aggregation-only without filter
QueryContext can be solved with segment metadata,
+ * {@code false} otherwise.
+ * <p>Aggregations supported: COUNT
*/
- public static boolean isFitForMetadataBasedPlan(BrokerRequest brokerRequest,
IndexSegment indexSegment) {
- if (brokerRequest.getFilterQuery() != null ||
brokerRequest.isSetGroupBy()) {
- return false;
- }
-
- List<AggregationInfo> aggregationsInfo =
brokerRequest.getAggregationsInfo();
- if (aggregationsInfo == null) {
- return false;
- }
- for (AggregationInfo aggInfo : aggregationsInfo) {
- if (!isMetadataBasedAggregationFunction(aggInfo, indexSegment)) {
+ @VisibleForTesting
+ static boolean isFitForMetadataBasedPlan(QueryContext queryContext) {
+ List<ExpressionContext> selectExpressions =
queryContext.getSelectExpressions();
+ for (ExpressionContext expression : selectExpressions) {
+ if (!expression.getFunction().getFunctionName().equals("count")) {
return false;
}
}
return true;
}
- private static boolean isMetadataBasedAggregationFunction(AggregationInfo
aggregationInfo,
- IndexSegment indexSegment) {
- return
AggregationFunctionType.getAggregationFunctionType(aggregationInfo.getAggregationType())
- == AggregationFunctionType.COUNT;
- }
-
/**
- * Helper method to identify if query is fit to be be served purely based on
dictionary.
- * It can be served through dictionary only for min, max, minmaxrange
queries as of now,
- * and if a dictionary is present for the column
- * @param brokerRequest Broker request
- * @param indexSegment
- * @return True if query can be served using dictionary, false otherwise.
+ * Returns {@code true} if the given aggregation-only without filter
QueryContext can be solved with dictionary,
+ * {@code false} otherwise.
+ * <p>Aggregations supported: MIN, MAX, MINMAXRANGE
*/
- public static boolean isFitForDictionaryBasedPlan(BrokerRequest
brokerRequest, IndexSegment indexSegment) {
- if ((brokerRequest.getFilterQuery() != null) ||
brokerRequest.isSetGroupBy()) {
- return false;
- }
- List<AggregationInfo> aggregationsInfo =
brokerRequest.getAggregationsInfo();
- if (aggregationsInfo == null) {
- return false;
- }
- for (AggregationInfo aggregationInfo : aggregationsInfo) {
- if (!isDictionaryBasedAggregationFunction(aggregationInfo,
indexSegment)) {
+ @VisibleForTesting
+ static boolean isFitForDictionaryBasedPlan(QueryContext queryContext,
IndexSegment indexSegment) {
+ List<ExpressionContext> selectExpressions =
queryContext.getSelectExpressions();
+ for (ExpressionContext expression : selectExpressions) {
+ FunctionContext function = expression.getFunction();
+ String functionName = function.getFunctionName();
+ if (!functionName.equals("min") && !functionName.equals("max") &&
!functionName.equals("minmaxrange")) {
return false;
}
- }
- return true;
- }
-
- private static boolean isDictionaryBasedAggregationFunction(AggregationInfo
aggregationInfo,
- IndexSegment indexSegment) {
- AggregationFunctionType functionType =
-
AggregationFunctionType.getAggregationFunctionType(aggregationInfo.getAggregationType());
- if (functionType
- .isOfType(AggregationFunctionType.MIN, AggregationFunctionType.MAX,
AggregationFunctionType.MINMAXRANGE)) {
- String column =
AggregationFunctionUtils.getArguments(aggregationInfo).get(0);
- if (indexSegment.getColumnNames().contains(column)) {
- Dictionary dictionary =
indexSegment.getDataSource(column).getDictionary();
- return dictionary != null && dictionary.isSorted();
+ ExpressionContext argument = function.getArguments().get(0);
+ if (argument.getType() != ExpressionContext.Type.IDENTIFIER) {
+ return false;
+ }
+ String column = argument.getIdentifier();
+ Dictionary dictionary =
indexSegment.getDataSource(column).getDictionary();
+ if (dictionary == null || !dictionary.isSorted()) {
+ return false;
}
}
- return false;
+ return true;
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
index 9170616..553a6d8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/PlanMaker.java
@@ -20,36 +20,27 @@ package org.apache.pinot.core.plan.maker;
import java.util.List;
import java.util.concurrent.ExecutorService;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.PlanNode;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
/**
- * The <code>PlanMaker</code> provides interfaces to make segment level and
instance level execution plan.
+ * The {@code PlanMaker} makes logical execution plan for the queries.
*/
[email protected]
public interface PlanMaker {
/**
- * Make segment level {@link PlanNode} which contains execution plan on only
one segment.
- *
- * @param indexSegment index segment.
- * @param brokerRequest broker request.
- * @return segment level plan node.
+ * Returns an instance level {@link Plan} which contains the logical
execution plan for multiple segments.
*/
- PlanNode makeInnerSegmentPlan(IndexSegment indexSegment, BrokerRequest
brokerRequest);
+ Plan makeInstancePlan(List<IndexSegment> indexSegments, QueryContext
queryContext, ExecutorService executorService,
+ long timeoutMs);
/**
- * Make instance level {@link Plan} which contains execution plan on
multiple segments.
- *
- * @param segmentDataManagers list of segment data manager.
- * @param brokerRequest broker request.
- * @param executorService executor service.
- * @param timeOutMs time out in milliseconds.
- * @return instance level plan.
+ * Returns a segment level {@link PlanNode} which contains the logical
execution plan for one segment.
*/
- Plan makeInterSegmentPlan(List<SegmentDataManager> segmentDataManagers,
BrokerRequest brokerRequest,
- ExecutorService executorService, long timeOutMs);
+ PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext
queryContext);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index b420f5d..2f8be38 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.query.executor;
import com.google.common.base.Preconditions;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -204,9 +205,12 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
} else {
TimerContext.Timer planBuildTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
- Plan globalQueryPlan = _planMaker
- .makeInterSegmentPlan(segmentDataManagers,
queryContext.getBrokerRequest(), executorService,
- remainingTimeMs);
+ List<IndexSegment> indexSegments = new
ArrayList<>(numSegmentsMatchedAfterPruning);
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ indexSegments.add(segmentDataManager.getSegment());
+ }
+ Plan globalQueryPlan =
+ _planMaker.makeInstancePlan(indexSegments, queryContext,
executorService, remainingTimeMs);
planBuildTimer.stopAndRecord();
TimerContext.Timer planExecTimer =
timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 13d2088..591cba0 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -36,6 +36,8 @@ import
org.apache.pinot.core.plan.DictionaryBasedAggregationPlanNode;
import org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode;
import org.apache.pinot.core.plan.PlanNode;
import org.apache.pinot.core.plan.SelectionPlanNode;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.pql.parsers.Pql2Compiler;
@@ -46,7 +48,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeClass;
@@ -54,6 +55,10 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
public class MetadataAndDictionaryAggregationPlanMakerTest {
private static final String AVRO_DATA = "data" + File.separator +
"test_data-sv.avro";
@@ -72,7 +77,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
// Get resource file path.
URL resource = getClass().getClassLoader().getResource(AVRO_DATA);
- Assert.assertNotNull(resource);
+ assertNotNull(resource);
String filePath = resource.getFile();
// Build the segment schema.
@@ -128,8 +133,9 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
@Test(dataProvider = "testPlanNodeMakerDataProvider")
public void testInstancePlanMakerForMetadataAndDictionaryPlan(String query,
Class<? extends PlanNode> planNodeClass) {
BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(query);
- PlanNode plan = PLAN_MAKER.makeInnerSegmentPlan(_indexSegment,
brokerRequest);
- Assert.assertTrue(planNodeClass.isInstance(plan));
+ QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+ PlanNode plan = PLAN_MAKER.makeSegmentPlanNode(_indexSegment,
queryContext);
+ assertTrue(planNodeClass.isInstance(plan));
}
@DataProvider(name = "testPlanNodeMakerDataProvider")
@@ -171,18 +177,15 @@ public class
MetadataAndDictionaryAggregationPlanMakerTest {
public void testIsFitFor(String query, IndexSegment indexSegment, boolean
expectedIsFitForMetadata,
boolean expectedIsFitForDictionary) {
BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(query);
-
- boolean isFitForMetadataBasedPlan =
InstancePlanMakerImplV2.isFitForMetadataBasedPlan(brokerRequest, indexSegment);
- boolean isFitForDictionaryBasedPlan =
- InstancePlanMakerImplV2.isFitForDictionaryBasedPlan(brokerRequest,
indexSegment);
- Assert.assertEquals(isFitForMetadataBasedPlan, expectedIsFitForMetadata);
- Assert.assertEquals(isFitForDictionaryBasedPlan,
expectedIsFitForDictionary);
+ QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+
assertEquals(InstancePlanMakerImplV2.isFitForMetadataBasedPlan(queryContext),
expectedIsFitForMetadata);
+
assertEquals(InstancePlanMakerImplV2.isFitForDictionaryBasedPlan(queryContext,
indexSegment),
+ expectedIsFitForDictionary);
}
@DataProvider(name = "isFitForPlanDataProvider")
public Object[][] provideDataForIsFitChecks() {
List<Object[]> entries = new ArrayList<>();
- entries.add(new Object[]{"select * from testTable", _indexSegment, false,
false});
entries.add(
new Object[]{"select count(*) from testTable", _indexSegment, true,
false /* count* from metadata, even if star tree present */});
entries.add(
@@ -192,11 +195,6 @@ public class MetadataAndDictionaryAggregationPlanMakerTest
{
entries.add(
new Object[]{"select count(*),max(daysSinceEpoch) from testTable",
_indexSegment, false, false /* count* and max(time) from metadata*/});
entries.add(new Object[]{"select sum(column1) from testTable",
_indexSegment, false, false});
- entries.add(new Object[]{"select count(*) from testTable group by
daysSinceEpoch", _indexSegment, false, false});
- entries.add(new Object[]{"select count(*) from testTable where
daysSinceEpoch > 1", _indexSegment, false, false});
- entries.add(
- new Object[]{"select max(column5) from testTable where daysSinceEpoch
> 100", _indexSegment, false, false});
- entries.add(new Object[]{"select column1 from testTable", _indexSegment,
false, false});
return entries.toArray(new Object[entries.size()][]);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseMultiValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseMultiValueQueriesTest.java
index add44b5..e08b4f0 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseMultiValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseMultiValueQueriesTest.java
@@ -24,19 +24,17 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
@@ -76,7 +74,7 @@ public abstract class BaseMultiValueQueriesTest extends
BaseQueriesTest {
private IndexSegment _indexSegment;
// Contains 2 identical index segments.
- private List<SegmentDataManager> _segmentDataManagers;
+ private List<IndexSegment> _indexSegments;
@BeforeTest
public void buildSegment()
@@ -96,8 +94,7 @@ public abstract class BaseMultiValueQueriesTest extends
BaseQueriesTest {
.addMultiValueDimension("column7", FieldSpec.DataType.INT)
.addSingleValueDimension("column8",
FieldSpec.DataType.INT).addMetric("column9", FieldSpec.DataType.INT)
.addMetric("column10", FieldSpec.DataType.INT)
- .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT,
TimeUnit.DAYS, "daysSinceEpoch"), null)
- .build();
+ .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT,
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
@@ -125,8 +122,7 @@ public abstract class BaseMultiValueQueriesTest extends
BaseQueriesTest {
throws Exception {
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
_indexSegment = immutableSegment;
- _segmentDataManagers = Arrays
- .asList(new ImmutableSegmentDataManager(immutableSegment), new
ImmutableSegmentDataManager(immutableSegment));
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
@AfterClass
@@ -150,7 +146,7 @@ public abstract class BaseMultiValueQueriesTest extends
BaseQueriesTest {
}
@Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index 6b012d3..07db200 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -23,17 +23,20 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import javax.annotation.Nullable;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
import org.apache.pinot.common.utils.CommonConstants.Server;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.Operator;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.plan.Plan;
import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
import org.apache.pinot.core.plan.maker.PlanMaker;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.spi.config.table.TableType;
@@ -53,183 +56,138 @@ public abstract class BaseQueriesTest {
protected abstract IndexSegment getIndexSegment();
- protected abstract List<SegmentDataManager> getSegmentDataManagers();
+ protected abstract List<IndexSegment> getIndexSegments();
/**
- * Run query on single index segment.
+ * Run PQL query on single index segment.
* <p>Use this to test a single operator.
- *
- * @param query PQL query.
- * @return query operator.
*/
- @SuppressWarnings("unchecked")
- protected <T extends Operator> T getOperatorForQuery(String query) {
- return (T) PLAN_MAKER.makeInnerSegmentPlan(getIndexSegment(),
PQL_COMPILER.compileToBrokerRequest(query)).run();
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ protected <T extends Operator> T getOperatorForQuery(String pqlQuery) {
+ BrokerRequest brokerRequest =
PQL_COMPILER.compileToBrokerRequest(pqlQuery);
+ QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+ return (T) PLAN_MAKER.makeSegmentPlanNode(getIndexSegment(),
queryContext).run();
}
/**
- * Run query with hard-coded filter on single index segment.
+ * Run PQL query with hard-coded filter on single index segment.
* <p>Use this to test a single operator.
- *
- * @param query PQL query without any filter.
- * @return query operator.
*/
- protected <T extends Operator> T getOperatorForQueryWithFilter(String query)
{
- return getOperatorForQuery(query + getFilter());
+ @SuppressWarnings("rawtypes")
+ protected <T extends Operator> T getOperatorForQueryWithFilter(String
pqlQuery) {
+ return getOperatorForQuery(pqlQuery + getFilter());
}
/**
- * Run query on multiple index segments with custom plan maker.
- * <p>Use this to test the whole flow from server to broker.
- * <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param query PQL query.
- * @param planMaker Plan maker.
- * @return broker response.
- */
- protected BrokerResponseNative getBrokerResponseForPqlQuery(String query,
PlanMaker planMaker) {
- return getBrokerResponseForPqlQuery(query, planMaker, null);
- }
-
- /**
- * Run query on multiple index segments with custom plan maker and
queryOptions.
+ * Run PQL query on multiple index segments.
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param query PQL query.
- * @param planMaker Plan maker.
- * @return broker response.
*/
- private BrokerResponseNative getBrokerResponseForPqlQuery(String query,
PlanMaker planMaker,
- Map<String, String> queryOptions) {
- BrokerRequest brokerRequest = PQL_COMPILER.compileToBrokerRequest(query);
- return getBrokerResponseForBrokerRequest(brokerRequest, planMaker,
queryOptions);
+ protected BrokerResponseNative getBrokerResponseForPqlQuery(String pqlQuery)
{
+ return getBrokerResponseForPqlQuery(pqlQuery, PLAN_MAKER);
}
/**
- * Run query on multiple index segments.
+ * Run PQL query with hard-coded filter on multiple index segments.
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param query SQL query.
- * @return broker response.
*/
- protected BrokerResponseNative getBrokerResponseForSqlQuery(String query) {
- return getBrokerResponseForSqlQuery(query, PLAN_MAKER);
+ protected BrokerResponseNative getBrokerResponseForPqlQueryWithFilter(String
pqlQuery) {
+ return getBrokerResponseForPqlQuery(pqlQuery + getFilter());
}
/**
- * Run query on multiple index segments with custom plan maker.
+ * Run PQL query on multiple index segments with custom plan maker.
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param query SQL query.
- * @param planMaker Plan maker.
- * @return broker response.
*/
- protected BrokerResponseNative getBrokerResponseForSqlQuery(String query,
PlanMaker planMaker) {
- HashMap<String, String> queryOptions = new HashMap<>();
- queryOptions.put("groupByMode", "sql");
- queryOptions.put("responseFormat", "sql");
- return getBrokerResponseForSqlQuery(query, planMaker, queryOptions);
+ protected BrokerResponseNative getBrokerResponseForPqlQuery(String pqlQuery,
PlanMaker planMaker) {
+ return getBrokerResponseForPqlQuery(pqlQuery, planMaker, null);
}
/**
- * Run query on multiple index segments with custom plan maker and
queryOptions.
+ * Run PQL query on multiple index segments.
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param query SQL query.
- * @param planMaker Plan maker.
- * @return broker response.
*/
- private BrokerResponseNative getBrokerResponseForSqlQuery(String query,
PlanMaker planMaker,
- Map<String, String> queryOptions) {
- BrokerRequest brokerRequest = SQL_COMPILER.compileToBrokerRequest(query);
- return getBrokerResponseForBrokerRequest(brokerRequest, planMaker,
queryOptions);
+ protected BrokerResponseNative getBrokerResponseForPqlQuery(String pqlQuery,
+ @Nullable Map<String, String> extraQueryOptions) {
+ return getBrokerResponseForPqlQuery(pqlQuery, PLAN_MAKER,
extraQueryOptions);
}
/**
- * Run query on multiple index segments with custom plan maker and
queryOptions.
+ * Run PQL query on multiple index segments with custom plan maker and
queryOptions.
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param brokerRequest broker request.
- * @param planMaker Plan maker.
- * @return broker response.
*/
- private BrokerResponseNative getBrokerResponseForBrokerRequest(BrokerRequest
brokerRequest, PlanMaker planMaker,
- Map<String, String> queryOptions) {
- Map<String, String> allQueryOptions = new HashMap<>();
-
- if (queryOptions != null) {
- allQueryOptions.putAll(queryOptions);
- }
- if (brokerRequest.getQueryOptions() != null) {
- allQueryOptions.putAll(brokerRequest.getQueryOptions());
+ private BrokerResponseNative getBrokerResponseForPqlQuery(String pqlQuery,
PlanMaker planMaker,
+ @Nullable Map<String, String> extraQueryOptions) {
+ BrokerRequest brokerRequest =
PQL_COMPILER.compileToBrokerRequest(pqlQuery);
+ if (extraQueryOptions != null) {
+ Map<String, String> queryOptions = brokerRequest.getQueryOptions();
+ if (queryOptions != null) {
+ queryOptions.putAll(extraQueryOptions);
+ } else {
+ brokerRequest.setQueryOptions(extraQueryOptions);
+ }
}
- if (!allQueryOptions.isEmpty()) {
- brokerRequest.setQueryOptions(allQueryOptions);
- }
-
- // Server side.
- Plan plan = planMaker.makeInterSegmentPlan(getSegmentDataManagers(),
brokerRequest, EXECUTOR_SERVICE,
- Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
- DataTable instanceResponse = plan.execute();
-
- // Broker side.
- BrokerReduceService brokerReduceService = new BrokerReduceService();
- Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
- dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.OFFLINE), instanceResponse);
- dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.REALTIME), instanceResponse);
- return brokerReduceService.reduceOnDataTable(brokerRequest, dataTableMap,
null);
+ QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+ return getBrokerResponse(queryContext, planMaker);
}
/**
- * Run query on multiple index segments.
+ * Run SQL query on multiple index segments.
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param query PQL query.
- * @return broker response.
*/
- protected BrokerResponseNative getBrokerResponseForPqlQuery(String query) {
- return getBrokerResponseForPqlQuery(query, PLAN_MAKER);
+ protected BrokerResponseNative getBrokerResponseForSqlQuery(String sqlQuery)
{
+ return getBrokerResponseForSqlQuery(sqlQuery, PLAN_MAKER);
}
/**
- * Run query on multiple index segments.
+ * Run SQL query with hard-coded filter on multiple index segments.
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param query PQL query.
- * @return broker response.
*/
- protected BrokerResponseNative getBrokerResponseForPqlQuery(String query,
Map<String, String> queryOptions) {
- return getBrokerResponseForPqlQuery(query, PLAN_MAKER, queryOptions);
+ protected BrokerResponseNative getBrokerResponseForSqlQueryWithFilter(String
sqlQuery) {
+ return getBrokerResponseForSqlQuery(sqlQuery + getFilter());
}
/**
- * Run query with hard-coded filter on multiple index segments.
+ * Run SQL query on multiple index segments with custom plan maker.
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param query PQL query without any filter.
- * @return broker response.
*/
- protected BrokerResponseNative getBrokerResponseForPqlQueryWithFilter(String
query) {
- return getBrokerResponseForPqlQuery(query + getFilter());
+ @SuppressWarnings("SameParameterValue")
+ protected BrokerResponseNative getBrokerResponseForSqlQuery(String sqlQuery,
PlanMaker planMaker) {
+ BrokerRequest brokerRequest =
SQL_COMPILER.compileToBrokerRequest(sqlQuery);
+ Map<String, String> queryOptions = brokerRequest.getQueryOptions();
+ if (queryOptions == null) {
+ queryOptions = new HashMap<>();
+ brokerRequest.setQueryOptions(queryOptions);
+ }
+ queryOptions.put(Request.QueryOptionKey.GROUP_BY_MODE, Request.SQL);
+ queryOptions.put(Request.QueryOptionKey.RESPONSE_FORMAT, Request.SQL);
+ QueryContext queryContext =
BrokerRequestToQueryContextConverter.convert(brokerRequest);
+ return getBrokerResponse(queryContext, planMaker);
}
/**
- * Run query with hard-coded filter on multiple index segments.
+ * Run query on multiple index segments with custom plan maker.
* <p>Use this to test the whole flow from server to broker.
* <p>The result should be equivalent to querying 4 identical index segments.
- *
- * @param query SQL query without any filter.
- * @return broker response.
*/
- protected BrokerResponseNative getBrokerResponseForSqlQueryWithFilter(String
query) {
- return getBrokerResponseForSqlQuery(query + getFilter());
+ private BrokerResponseNative getBrokerResponse(QueryContext queryContext,
PlanMaker planMaker) {
+ // Server side.
+ Plan plan = planMaker
+ .makeInstancePlan(getIndexSegments(), queryContext, EXECUTOR_SERVICE,
Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS);
+ DataTable instanceResponse = plan.execute();
+
+ // Broker side.
+ BrokerReduceService brokerReduceService = new BrokerReduceService();
+ Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+ dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.OFFLINE), instanceResponse);
+ dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.REALTIME), instanceResponse);
+ return
brokerReduceService.reduceOnDataTable(queryContext.getBrokerRequest(),
dataTableMap, null);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
index e46e87f..4ec7fbc 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/BaseSingleValueQueriesTest.java
@@ -26,8 +26,6 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -70,7 +68,7 @@ public abstract class BaseSingleValueQueriesTest extends
BaseQueriesTest {
private static final String AVRO_DATA = "data" + File.separator +
"test_data-sv.avro";
private static final String SEGMENT_NAME = "testTable_126164076_167572854";
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"SingleValueQueriesTest");
- private static final int NUM_SEGMENT_DATA_MANAGERS = 2;
+ private static final int NUM_SEGMENTS = 2;
// Hard-coded query filter.
private static final String QUERY_FILTER =
@@ -79,7 +77,7 @@ public abstract class BaseSingleValueQueriesTest extends
BaseQueriesTest {
private IndexSegment _indexSegment;
// Contains 2 identical index segments.
- private List<SegmentDataManager> _segmentDataManagers;
+ private List<IndexSegment> _indexSegments;
@BeforeTest
public void buildSegment()
@@ -100,8 +98,7 @@ public abstract class BaseSingleValueQueriesTest extends
BaseQueriesTest {
.addSingleValueDimension("column11", FieldSpec.DataType.STRING)
.addSingleValueDimension("column12",
FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
.addMetric("column18", FieldSpec.DataType.INT)
- .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT,
TimeUnit.DAYS, "daysSinceEpoch"), null)
- .build();
+ .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT,
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
@@ -130,15 +127,15 @@ public abstract class BaseSingleValueQueriesTest extends
BaseQueriesTest {
throws Exception {
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
_indexSegment = immutableSegment;
- int numSegmentDataManagers = getNumSegmentDataManagers();
- _segmentDataManagers = new ArrayList<>(numSegmentDataManagers);
- for (int i = 0; i < numSegmentDataManagers; i++) {
- _segmentDataManagers.add(new
ImmutableSegmentDataManager(immutableSegment));
+ int numSegments = getNumSegments();
+ _indexSegments = new ArrayList<>(numSegments);
+ for (int i = 0; i < numSegments; i++) {
+ _indexSegments.add(immutableSegment);
}
}
- protected int getNumSegmentDataManagers() {
- return NUM_SEGMENT_DATA_MANAGERS;
+ protected int getNumSegments() {
+ return NUM_SEGMENTS;
}
@AfterClass
@@ -162,7 +159,7 @@ public abstract class BaseSingleValueQueriesTest extends
BaseQueriesTest {
}
@Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
index 6b805df..5ca02e1 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctCountThetaSketchTest.java
@@ -24,7 +24,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import joptsimple.internal.Strings;
import org.apache.commons.io.FileUtils;
@@ -37,8 +36,6 @@ import
org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.GroupByResult;
import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -68,21 +65,20 @@ import org.testng.annotations.Test;
* </ul>
*/
public class DistinctCountThetaSketchTest extends BaseQueriesTest {
- protected static final File INDEX_DIR = new
File(FileUtils.getTempDirectory(), "DistinctCountThetaSketchTest");
- protected static final String TABLE_NAME = "testTable";
- protected static final String SEGMENT_NAME = "testSegment";
-
- protected static final int NUM_ROWS = 1001;
- protected static final long RANDOM_SEED = System.nanoTime();
-
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"DistinctCountThetaSketchTest");
+ private static final String TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
private static final String THETA_SKETCH_COLUMN = "colTS";
private static final String DISTINCT_COLUMN = "distinctColumn";
- private static Random RANDOM = new Random(RANDOM_SEED);
- protected static final int MAX_CARDINALITY = 5; // 3 columns will lead to at
most 125 groups
+ private static final int NUM_ROWS = 1001;
+ private static final int MAX_CARDINALITY = 5; // 3 columns will lead to at
most 125 groups
+
+ private static final long RANDOM_SEED = System.nanoTime();
+ private static final Random RANDOM = new Random(RANDOM_SEED);
- private ImmutableSegment _indexSegment;
- private List<SegmentDataManager> _segmentDataManagers;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
@BeforeClass
public void setup()
@@ -90,9 +86,9 @@ public class DistinctCountThetaSketchTest extends
BaseQueriesTest {
FileUtils.deleteQuietly(INDEX_DIR);
File segmentFile = buildSegment(buildSchema());
- _indexSegment = ImmutableSegmentLoader.load(segmentFile, ReadMode.mmap);
- _segmentDataManagers =
- Arrays.asList(new ImmutableSegmentDataManager(_indexSegment), new
ImmutableSegmentDataManager(_indexSegment));
+ ImmutableSegment immutableSegment =
ImmutableSegmentLoader.load(segmentFile, ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
@AfterClass
@@ -179,12 +175,11 @@ public class DistinctCountThetaSketchTest extends
BaseQueriesTest {
}
private void testQuery(String tsQuery, String distinctQuery, boolean
groupBy, boolean sql, boolean raw) {
- Map<String, String> queryOptions = Collections.emptyMap();
BrokerResponseNative actualResponse =
- (sql) ? getBrokerResponseForSqlQuery(tsQuery) :
getBrokerResponseForPqlQuery(tsQuery, queryOptions);
+ (sql) ? getBrokerResponseForSqlQuery(tsQuery) :
getBrokerResponseForPqlQuery(tsQuery);
BrokerResponseNative expectedResponse =
- (sql) ? getBrokerResponseForSqlQuery(distinctQuery) :
getBrokerResponseForPqlQuery(distinctQuery, queryOptions);
+ (sql) ? getBrokerResponseForSqlQuery(distinctQuery) :
getBrokerResponseForPqlQuery(distinctQuery);
if (groupBy) {
compareGroupBy(actualResponse, expectedResponse, sql, raw);
@@ -328,8 +323,8 @@ public class DistinctCountThetaSketchTest extends
BaseQueriesTest {
}
@Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
}
protected File buildSegment(Schema schema)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 85b39ba..a44d0b7 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -40,8 +40,6 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.indexsegment.IndexSegment;
@@ -51,6 +49,8 @@ import
org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.core.operator.query.AggregationOperator;
import
org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
import org.apache.pinot.core.query.reduce.BrokerReduceService;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -95,7 +95,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
private IndexSegment _indexSegment;
- private List<SegmentDataManager> _segmentDataManagers;
+ private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
@@ -108,8 +108,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
}
@Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
}
@BeforeClass
@@ -351,8 +351,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
throws Exception {
ImmutableSegment segment0 = createSegment(0, generateRecords(0));
ImmutableSegment segment1 = createSegment(1, generateRecords(1000));
- _segmentDataManagers =
- Arrays.asList(new ImmutableSegmentDataManager(segment0), new
ImmutableSegmentDataManager(segment1));
+ _indexSegments = Arrays.asList(segment0, segment1);
try {
{
// Test selecting all columns
@@ -605,10 +604,12 @@ public class DistinctQueriesTest extends BaseQueriesTest {
"SELECT DISTINCT longColumn FROM testTable WHERE doubleColumn <
200 ORDER BY longColumn DESC LIMIT 5";
BrokerRequest pqlBrokerRequest =
PQL_COMPILER.compileToBrokerRequest(pqlQuery);
- BrokerResponseNative pqlResponse =
queryServersWithDifferentSegments(pqlBrokerRequest, segment0, segment1);
+ QueryContext pqlQueryContext =
BrokerRequestToQueryContextConverter.convert(pqlBrokerRequest);
+ BrokerResponseNative pqlResponse =
queryServersWithDifferentSegments(pqlQueryContext, segment0, segment1);
BrokerRequest sqlBrokerRequest =
SQL_COMPILER.compileToBrokerRequest(sqlQuery);
sqlBrokerRequest.setQueryOptions(Collections.singletonMap("responseFormat",
"sql"));
- BrokerResponseNative sqlResponse =
queryServersWithDifferentSegments(sqlBrokerRequest, segment0, segment1);
+ QueryContext sqlQueryContext =
BrokerRequestToQueryContextConverter.convert(sqlBrokerRequest);
+ BrokerResponseNative sqlResponse =
queryServersWithDifferentSegments(sqlQueryContext, segment0, segment1);
// Check data schema
SelectionResults selectionResults = pqlResponse.getSelectionResults();
@@ -636,8 +637,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
}
}
} finally {
- for (SegmentDataManager segmentDataManager : _segmentDataManagers) {
- segmentDataManager.destroy();
+ for (IndexSegment indexSegment : _indexSegments) {
+ indexSegment.destroy();
}
}
}
@@ -646,24 +647,21 @@ public class DistinctQueriesTest extends BaseQueriesTest {
* Helper method to query 2 servers with different segments. Server0 will
have 2 copies of segment0; Server1 will have
* 2 copies of segment1.
*/
- private BrokerResponseNative queryServersWithDifferentSegments(BrokerRequest
brokerRequest, ImmutableSegment segment0,
+ private BrokerResponseNative queryServersWithDifferentSegments(QueryContext
queryContext, ImmutableSegment segment0,
ImmutableSegment segment1) {
- List<SegmentDataManager> segmentDataManagers0 =
- Arrays.asList(new ImmutableSegmentDataManager(segment0), new
ImmutableSegmentDataManager(segment0));
- List<SegmentDataManager> segmentDataManagers1 =
- Arrays.asList(new ImmutableSegmentDataManager(segment1), new
ImmutableSegmentDataManager(segment1));
-
// Server side
- DataTable instanceResponse0 =
PLAN_MAKER.makeInterSegmentPlan(segmentDataManagers0, brokerRequest,
EXECUTOR_SERVICE,
- CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
- DataTable instanceResponse1 =
PLAN_MAKER.makeInterSegmentPlan(segmentDataManagers1, brokerRequest,
EXECUTOR_SERVICE,
- CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+ DataTable instanceResponse0 = PLAN_MAKER
+ .makeInstancePlan(Arrays.asList(segment0, segment0), queryContext,
EXECUTOR_SERVICE,
+
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
+ DataTable instanceResponse1 = PLAN_MAKER
+ .makeInstancePlan(Arrays.asList(segment1, segment1), queryContext,
EXECUTOR_SERVICE,
+
CommonConstants.Server.DEFAULT_QUERY_EXECUTOR_TIMEOUT_MS).execute();
// Broker side
BrokerReduceService brokerReduceService = new BrokerReduceService();
Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.OFFLINE), instanceResponse0);
dataTableMap.put(new ServerRoutingInstance("localhost", 1234,
TableType.REALTIME), instanceResponse1);
- return brokerReduceService.reduceOnDataTable(brokerRequest, dataTableMap,
null);
+ return
brokerReduceService.reduceOnDataTable(queryContext.getBrokerRequest(),
dataTableMap, null);
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
index 41f7b9e..386ae27 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/FastHllQueriesTest.java
@@ -27,8 +27,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -85,7 +83,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
private IndexSegment _indexSegment;
// Contains 2 identical index segments
- private List<SegmentDataManager> _segmentDataManagers;
+ private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
@@ -98,8 +96,8 @@ public class FastHllQueriesTest extends BaseQueriesTest {
}
@Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
}
@Test
@@ -196,8 +194,7 @@ public class FastHllQueriesTest extends BaseQueriesTest {
ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
_indexSegment = immutableSegment;
- _segmentDataManagers = Arrays
- .asList(new ImmutableSegmentDataManager(immutableSegment), new
ImmutableSegmentDataManager(immutableSegment));
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
private void deleteSegment() {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
index dcae9e7..d1e7023 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/PercentileTDigestQueriesTest.java
@@ -31,21 +31,11 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.MetricFieldSpec;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.GroupByResult;
import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
-import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -57,6 +47,14 @@ import
org.apache.pinot.core.query.aggregation.function.PercentileTDigestAggrega
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -92,8 +90,8 @@ public class PercentileTDigestQueriesTest extends
BaseQueriesTest {
protected static final Random RANDOM = new Random(RANDOM_SEED);
protected static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
- private ImmutableSegment _indexSegment;
- private List<SegmentDataManager> _segmentDataManagers;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
@@ -106,8 +104,8 @@ public class PercentileTDigestQueriesTest extends
BaseQueriesTest {
}
@Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
}
@BeforeClass
@@ -116,9 +114,9 @@ public class PercentileTDigestQueriesTest extends
BaseQueriesTest {
FileUtils.deleteQuietly(INDEX_DIR);
buildSegment();
- _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), ReadMode.mmap);
- _segmentDataManagers =
- Arrays.asList(new ImmutableSegmentDataManager(_indexSegment), new
ImmutableSegmentDataManager(_indexSegment));
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
protected void buildSegment()
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
index 2f34a01..99f3a90 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/RangePredicateWithSortedInvertedIndexTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.queries;
import java.io.File;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
@@ -27,7 +28,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.common.utils.Pairs;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -43,66 +43,37 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.collections.Lists;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
-public class RangePredicateWithSortedInvertedIndexTest extends BaseQueriesTest
{
- private static final int NUM_ROWS = 30000;
- private List<GenericRow> _rows = new ArrayList<>();
+public class RangePredicateWithSortedInvertedIndexTest extends BaseQueriesTest
{
+ private static final File INDEX_DIR =
+ new File(FileUtils.getTempDirectory(),
"RangePredicateWithSortedInvertedIndexTest");
+ private static final String TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
private static final String D1 = "STRING_COL";
private static final String M1 = "INT_COL"; // sorted column
private static final String M2 = "LONG_COL";
+ private static final int NUM_ROWS = 30000;
private static final int INT_BASE_VALUE = 0;
- private static final String TABLE_NAME = "TestTable";
- private static final int NUM_SEGMENTS = 1;
- private static final String SEGMENT_NAME_1 = TABLE_NAME +
"_100000000_200000000";
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"SortedRangeTest");
-
- private List<IndexSegment> _indexSegments = new ArrayList<>(NUM_SEGMENTS);
- private final String[] stringValues = new String[NUM_ROWS];
- private final long[] longValues = new long[NUM_ROWS];
-
- private Schema _schema;
- private TableConfig _tableConfig;
+ private static final long RANDOM_SEED = System.nanoTime();
+ private static final Random RANDOM = new Random(RANDOM_SEED);
+ private static final String ERROR_MESSAGE = "Random seed: " + RANDOM_SEED;
- @BeforeClass
- public void setUp() {
- createPinotTableSchema();
- createTestData();
- }
+ private final String[] _stringValues = new String[NUM_ROWS];
+ private final long[] _longValues = new long[NUM_ROWS];
- @AfterClass
- public void tearDown() {
- FileUtils.deleteQuietly(INDEX_DIR);
- }
-
- private void createPinotTableSchema() {
- _schema =
- new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
- .addMetric(M1, FieldSpec.DataType.INT).addMetric(M2,
FieldSpec.DataType.LONG).build();
- _tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
- }
-
- private void createTestData() {
- Random random = new Random();
- for (int rowIndex = 0; rowIndex < NUM_ROWS; rowIndex++) {
- GenericRow row = new GenericRow();
- stringValues[rowIndex] = RandomStringUtils.randomAlphanumeric(10);
- row.putValue(D1, stringValues[rowIndex]);
- row.putValue(M1, INT_BASE_VALUE + rowIndex);
- longValues[rowIndex] = random.nextLong();
- row.putValue(M2, longValues[rowIndex]);
- _rows.add(row);
- }
- }
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
@@ -111,121 +82,134 @@ public class RangePredicateWithSortedInvertedIndexTest
extends BaseQueriesTest {
@Override
protected IndexSegment getIndexSegment() {
- return _indexSegments.get(0);
+ return _indexSegment;
}
@Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return null;
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
}
- private void createSegment(TableConfig tableConfig, Schema schema,
RecordReader recordReader, String segmentName,
- String tableName)
+ @BeforeClass
+ public void setUp()
throws Exception {
- SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
- segmentGeneratorConfig.setTableName(tableName);
- segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
- segmentGeneratorConfig.setSegmentName(segmentName);
+ FileUtils.deleteQuietly(INDEX_DIR);
- SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig, recordReader);
- driver.build();
+ buildSegment();
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+ }
- File segmentIndexDir = new File(INDEX_DIR.getAbsolutePath(), segmentName);
- if (!segmentIndexDir.exists()) {
- throw new IllegalStateException("Segment generation failed");
- }
+ @AfterClass
+ public void tearDown() {
+ _indexSegment.destroy();
+ FileUtils.deleteQuietly(INDEX_DIR);
}
- private ImmutableSegment loadSegment(String segmentName)
+ private void buildSegment()
throws Exception {
- return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
ReadMode.heap);
+ List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+ for (int rowIndex = 0; rowIndex < NUM_ROWS; rowIndex++) {
+ GenericRow row = new GenericRow();
+ _stringValues[rowIndex] = RandomStringUtils.randomAlphanumeric(10);
+ row.putValue(D1, _stringValues[rowIndex]);
+ row.putValue(M1, INT_BASE_VALUE + rowIndex);
+ _longValues[rowIndex] = RANDOM.nextLong();
+ row.putValue(M2, _longValues[rowIndex]);
+ rows.add(row);
+ }
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
+ .addMetric(M1, FieldSpec.DataType.INT).addMetric(M2,
FieldSpec.DataType.LONG).build();
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(SEGMENT_NAME);
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
+ }
}
@Test
- public void testInnerSegmentQuery()
- throws Exception {
- Random random = new Random();
- try (RecordReader recordReader = new GenericRowRecordReader(_rows)) {
- createSegment(_tableConfig, _schema, recordReader, SEGMENT_NAME_1,
TABLE_NAME);
- final ImmutableSegment immutableSegment = loadSegment(SEGMENT_NAME_1);
- _indexSegments.add(immutableSegment);
-
- String query = "SELECT STRING_COL, INT_COL FROM TestTable WHERE INT_COL
>= 20000 LIMIT 100000";
- Pairs.IntPair pair = new Pairs.IntPair(20000, 29999);
- runQuery(query, 10000, Lists.newArrayList(pair), 2);
-
- query = "SELECT STRING_COL, INT_COL FROM TestTable WHERE INT_COL >=
20000 AND INT_COL <= 23666 LIMIT 100000";
- pair = new Pairs.IntPair(20000, 23666);
- runQuery(query, 3667, Lists.newArrayList(pair), 2);
-
- query = "SELECT STRING_COL, INT_COL FROM TestTable WHERE INT_COL <=
20000 LIMIT 100000";
- pair = new Pairs.IntPair(0, 20000);
- runQuery(query, 20001, Lists.newArrayList(pair), 2);
-
- String filter = "WHERE (INT_COL >= 15000 AND INT_COL <= 16665) OR
(INT_COL >= 18000 AND INT_COL <= 19887)";
- query = "SELECT STRING_COL, INT_COL FROM TestTable " + filter + " LIMIT
100000";
- pair = new Pairs.IntPair(15000, 16665);
- Pairs.IntPair pair1 = new Pairs.IntPair(18000, 19987);
- runQuery(query, 3554, Lists.newArrayList(pair, pair1), 2);
-
- // range predicate on sorted column which will use sorted inverted index
based iterator
- // along with range predicate on unsorted column that uses scan based
iterator
- int index = random.nextInt(NUM_ROWS + 1);
- long longPredicateValue = longValues[index];
- int count = 0;
- List<Pairs.IntPair> pairs = new ArrayList<>();
- Pairs.IntPair current = null;
- for (int i = 0 ; i < longValues.length; i++) {
- if (longValues[i] >= longPredicateValue && i >= 15000 && i <= 16665) {
- if (current == null) {
- current = new Pairs.IntPair(i, i);
+ public void testInnerSegmentQuery() {
+ String query = "SELECT STRING_COL, INT_COL FROM testTable WHERE INT_COL >=
20000 LIMIT 100000";
+ Pairs.IntPair pair = new Pairs.IntPair(20000, 29999);
+ runQuery(query, 10000, Lists.newArrayList(pair), 2);
+
+ query = "SELECT STRING_COL, INT_COL FROM testTable WHERE INT_COL >= 20000
AND INT_COL <= 23666 LIMIT 100000";
+ pair = new Pairs.IntPair(20000, 23666);
+ runQuery(query, 3667, Lists.newArrayList(pair), 2);
+
+ query = "SELECT STRING_COL, INT_COL FROM testTable WHERE INT_COL <= 20000
LIMIT 100000";
+ pair = new Pairs.IntPair(0, 20000);
+ runQuery(query, 20001, Lists.newArrayList(pair), 2);
+
+ String filter = "WHERE (INT_COL >= 15000 AND INT_COL <= 16665) OR (INT_COL
>= 18000 AND INT_COL <= 19887)";
+ query = "SELECT STRING_COL, INT_COL FROM testTable " + filter + " LIMIT
100000";
+ pair = new Pairs.IntPair(15000, 16665);
+ Pairs.IntPair pair1 = new Pairs.IntPair(18000, 19987);
+ runQuery(query, 3554, Lists.newArrayList(pair, pair1), 2);
+
+ // range predicate on sorted column which will use sorted inverted index
based iterator
+ // along with range predicate on unsorted column that uses scan based
iterator
+ int index = RANDOM.nextInt(NUM_ROWS);
+ long longPredicateValue = _longValues[index];
+ int count = 0;
+ List<Pairs.IntPair> pairs = new ArrayList<>();
+ Pairs.IntPair current = null;
+ for (int i = 0; i < _longValues.length; i++) {
+ if (_longValues[i] >= longPredicateValue && i >= 15000 && i <= 16665) {
+ if (current == null) {
+ current = new Pairs.IntPair(i, i);
+ } else {
+ if (i == current.getRight() + 1) {
+ current.setRight(i);
} else {
- if (i == current.getRight() + 1) {
- current.setRight(i);
- } else {
- if (i <= longValues.length - 2) {
- pairs.add(current);
- current = new Pairs.IntPair(i, i);
- }
+ if (i <= _longValues.length - 2) {
+ pairs.add(current);
+ current = new Pairs.IntPair(i, i);
}
}
- count++;
}
+ count++;
}
- pairs.add(current);
- filter = "WHERE INT_COL >= 15000 AND INT_COL <= 16665 AND LONG_COL >= "
+ longPredicateValue;
- query = "SELECT STRING_COL, INT_COL, LONG_COL FROM TestTable " + filter
+ " LIMIT 100000";
- runQuery(query, count, pairs, 3);
-
- // empty resultset
- query = "SELECT STRING_COL, INT_COL FROM TestTable WHERe INT_COL < 0
LIMIT 100000";
- runQuery(query, 0, null, 0);
-
- // empty resultset
- query = "SELECT STRING_COL, INT_COL FROM TestTable WHERE INT_COL > 30000
LIMIT 100000";
- runQuery(query, 0, null, 0);
- } finally {
- destroySegments();
}
+ pairs.add(current);
+ filter = "WHERE INT_COL >= 15000 AND INT_COL <= 16665 AND LONG_COL >= " +
longPredicateValue;
+ query = "SELECT STRING_COL, INT_COL, LONG_COL FROM testTable " + filter +
" LIMIT 100000";
+ runQuery(query, count, pairs, 3);
+
+ // empty resultset
+ query = "SELECT STRING_COL, INT_COL FROM testTable WHERe INT_COL < 0 LIMIT
100000";
+ runQuery(query, 0, null, 0);
+
+ // empty resultset
+ query = "SELECT STRING_COL, INT_COL FROM testTable WHERE INT_COL > 30000
LIMIT 100000";
+ runQuery(query, 0, null, 0);
}
private void runQuery(String query, int count, List<Pairs.IntPair> intPairs,
int numColumns) {
SelectionOnlyOperator operator = getOperatorForQuery(query);
IntermediateResultsBlock block = operator.nextBlock();
Collection<Object[]> rows = block.getSelectionResult();
- Assert.assertNotNull(rows);
- Assert.assertEquals(rows.size(), count);
+ assertNotNull(rows, ERROR_MESSAGE);
+ assertEquals(rows.size(), count, ERROR_MESSAGE);
if (count > 0) {
Pairs.IntPair pair = intPairs.get(0);
int startPos = pair.getLeft();
int pairPos = 0;
for (Object[] row : rows) {
- Assert.assertEquals(numColumns, row.length);
- Assert.assertEquals(row[0], stringValues[startPos]);
- Assert.assertEquals(row[1], startPos);
+ assertEquals(numColumns, row.length, ERROR_MESSAGE);
+ assertEquals(row[0], _stringValues[startPos], ERROR_MESSAGE);
+ assertEquals(row[1], startPos, ERROR_MESSAGE);
if (numColumns == 3) {
- Assert.assertEquals(row[2], longValues[startPos]);
+ assertEquals(row[2], _longValues[startPos], ERROR_MESSAGE);
}
startPos++;
if (startPos > pair.getRight() && pairPos <= intPairs.size() - 2) {
@@ -236,13 +220,4 @@ public class RangePredicateWithSortedInvertedIndexTest
extends BaseQueriesTest {
}
}
}
-
- private void destroySegments() {
- for (IndexSegment indexSegment : _indexSegments) {
- if (indexSegment != null) {
- indexSegment.destroy();
- }
- }
- _indexSegments.clear();
- }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
index d6b89cf..b7df980 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SelectionOnlyEarlyTerminationTest.java
@@ -40,7 +40,7 @@ public class SelectionOnlyEarlyTerminationTest extends
BaseSingleValueQueriesTes
* (2 * MAX_NUM_THREADS_PER_QUERY) segments per server.
*/
@Override
- protected int getNumSegmentDataManagers() {
+ protected int getNumSegments() {
return CombineOperator.MAX_NUM_THREADS_PER_QUERY * 2;
}
@@ -52,7 +52,7 @@ public class SelectionOnlyEarlyTerminationTest extends
BaseSingleValueQueriesTes
@Test
public void testSelectOnlyQuery() {
int numThreadsPerServer = CombineOperator.MAX_NUM_THREADS_PER_QUERY;
- int numSegmentsPerServer = getNumSegmentDataManagers();
+ int numSegmentsPerServer = getNumSegments();
// LIMIT = 5, 10, 20, 40, 80, 160, 320, 640, 1280, 2560, 5120, 10240, 20480
for (int limit = 5; limit < NUM_DOCS_PER_SEGMENT; limit *= 2) {
@@ -95,7 +95,7 @@ public class SelectionOnlyEarlyTerminationTest extends
BaseSingleValueQueriesTes
*/
@Test
public void testSelectWithOrderByQuery() {
- int numSegmentsPerServer = getNumSegmentDataManagers();
+ int numSegmentsPerServer = getNumSegments();
String query = "SELECT column11, column18, column1 FROM testTable ORDER BY
column11";
int numColumnsInSelection = 3;
BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
index 29300ad..b1572b2 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/SerializedBytesQueriesTest.java
@@ -29,20 +29,12 @@ import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.GroupByResult;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.common.ObjectSerDeUtils;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
-import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -56,6 +48,12 @@ import
org.apache.pinot.core.query.aggregation.function.customobject.QuantileDig
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -111,8 +109,8 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
private final QuantileDigest[] _quantileDigests = new
QuantileDigest[NUM_ROWS];
private final TDigest[] _tDigests = new TDigest[NUM_ROWS];
- private ImmutableSegment _indexSegment;
- private List<SegmentDataManager> _segmentDataManagers;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
@@ -125,8 +123,8 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
}
@Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
}
@BeforeClass
@@ -135,9 +133,9 @@ public class SerializedBytesQueriesTest extends
BaseQueriesTest {
FileUtils.deleteQuietly(INDEX_DIR);
buildSegment();
- _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), ReadMode.mmap);
- _segmentDataManagers =
- Arrays.asList(new ImmutableSegmentDataManager(_indexSegment), new
ImmutableSegmentDataManager(_indexSegment));
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
private void buildSegment()
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
index da29a42..e8ab6d4 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/TextSearchQueriesTest.java
@@ -51,8 +51,6 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.response.broker.SelectionResults;
import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
@@ -63,6 +61,7 @@ import
org.apache.pinot.core.operator.query.AggregationOperator;
import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.FieldSpec;
@@ -84,33 +83,20 @@ import org.testng.annotations.Test;
* The test table has a SKILLS column and QUERY_LOG column. Text index is
created
* on each of these columns.
*/
-public class TextSearchQueriesTest extends BaseQueriesTest {
-
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"TextSearchQueries");
+public class TextSearchQueriesTest extends BaseQueriesTest { private static
final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"TextSearchQueriesTest");
private static final String TABLE_NAME = "MyTable";
- private static final String SEGMENT_NAME = TABLE_NAME +
"_100000000_200000000";
+ private static final String SEGMENT_NAME = "testSegment";
+
private static final String QUERY_LOG_TEXT_COL_NAME = "QUERY_LOG_TEXT_COL";
private static final String SKILLS_TEXT_COL_NAME = "SKILLS_TEXT_COL";
private static final String INT_COL_NAME = "INT_COL";
- private static final List<String> textIndexColumns = new ArrayList<>();
+ private static final List<String> TEXT_INDEX_COLUMNS =
Arrays.asList(QUERY_LOG_TEXT_COL_NAME, SKILLS_TEXT_COL_NAME);
private static final int INT_BASE_VALUE = 1000;
- private List<GenericRow> _rows = new ArrayList<>();
- private RecordReader _recordReader;
- Schema _schema;
- private TableConfig _tableConfig;
- private List<IndexSegment> _indexSegments = new ArrayList<>(1);
- private List<SegmentDataManager> _segmentDataManagers = new ArrayList<>();
+ private final List<GenericRow> _rows = new ArrayList<>();
- @BeforeClass
- public void setUp()
- throws Exception {
- createPinotTableSchema();
- createTestData();
- _recordReader = new GenericRowRecordReader(_rows);
- createSegment();
- loadSegment();
- }
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
@Override
protected String getFilter() {
@@ -119,67 +105,66 @@ public class TextSearchQueriesTest extends
BaseQueriesTest {
@Override
protected IndexSegment getIndexSegment() {
- return _indexSegments.get(0);
+ return _indexSegment;
}
@Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ buildSegment();
+ IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+ indexLoadingConfig.setTextIndexColumns(new HashSet<>(TEXT_INDEX_COLUMNS));
+ ImmutableSegment immutableSegment =
+ ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME),
indexLoadingConfig);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
@AfterClass
public void tearDown() {
- for (IndexSegment indexSegment : _indexSegments) {
- if (indexSegment != null) {
- indexSegment.destroy();
- }
- }
- _indexSegments.clear();
+ _indexSegment.destroy();
FileUtils.deleteQuietly(INDEX_DIR);
}
- private void createPinotTableSchema() {
- _schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ private void buildSegment()
+ throws Exception {
+ List<GenericRow> rows = createTestData();
+
+ List<FieldConfig> fieldConfigs = new
ArrayList<>(TEXT_INDEX_COLUMNS.size());
+ for (String textIndexColumn : TEXT_INDEX_COLUMNS) {
+ fieldConfigs
+ .add(new FieldConfig(textIndexColumn, FieldConfig.EncodingType.RAW,
FieldConfig.IndexType.TEXT, null));
+ }
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(TEXT_INDEX_COLUMNS)
+ .setFieldConfigList(fieldConfigs).build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addSingleValueDimension(QUERY_LOG_TEXT_COL_NAME,
FieldSpec.DataType.STRING)
.addSingleValueDimension(SKILLS_TEXT_COL_NAME,
FieldSpec.DataType.STRING)
.addMetric(INT_COL_NAME, FieldSpec.DataType.INT).build();
- _tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
- }
-
- private void createSegment()
- throws Exception {
- textIndexColumns.add(QUERY_LOG_TEXT_COL_NAME);
- textIndexColumns.add(SKILLS_TEXT_COL_NAME);
- SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(_tableConfig, _schema);
- segmentGeneratorConfig.setTableName(TABLE_NAME);
- segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
- segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
- segmentGeneratorConfig.setRawIndexCreationColumns(textIndexColumns);
- segmentGeneratorConfig.setTextIndexCreationColumns(textIndexColumns);
- segmentGeneratorConfig.setSkipTimeValueCheck(true);
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(SEGMENT_NAME);
SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig, _recordReader);
- driver.build();
-
- File segmentIndexDir = new File(INDEX_DIR.getAbsolutePath(), SEGMENT_NAME);
- if (!segmentIndexDir.exists()) {
- throw new IllegalStateException("Segment generation failed");
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
}
}
- private void loadSegment()
+ private List<GenericRow> createTestData()
throws Exception {
- IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
- indexLoadingConfig.setTextIndexColumns(new HashSet<>(textIndexColumns));
- ImmutableSegment segment = ImmutableSegmentLoader.load(new File(INDEX_DIR,
SEGMENT_NAME), indexLoadingConfig);
- _indexSegments.add(segment);
- _segmentDataManagers =
- Arrays.asList(new ImmutableSegmentDataManager(segment), new
ImmutableSegmentDataManager(segment));
- }
+ List<GenericRow> rows = new ArrayList<>();
- private void createTestData()
- throws Exception {
// read the skills file
URL resourceUrl =
getClass().getClassLoader().getResource("data/text_search_data/skills.txt");
File skillFile = new File(resourceUrl.getFile());
@@ -209,10 +194,12 @@ public class TextSearchQueriesTest extends
BaseQueriesTest {
} else {
row.putField(SKILLS_TEXT_COL_NAME, skills[counter]);
}
- _rows.add(row);
+ rows.add(row);
counter++;
}
}
+
+ return rows;
}
/**
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index 2344fe1..0b5c9f3 100644
---
a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -25,23 +25,12 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.segment.ReadMode;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.TimeUtils;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
import org.apache.pinot.core.data.readers.GenericRowRecordReader;
-import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
@@ -53,6 +42,13 @@ import
org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.TimeGranularitySpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@@ -61,8 +57,14 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
public class TransformQueriesTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"TransformQueriesTest");
+ private static final String TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
private static final String D1 = "STRING_COL";
private static final String M1 = "INT_COL1";
@@ -70,261 +72,184 @@ public class TransformQueriesTest extends BaseQueriesTest
{
private static final String M3 = "LONG_COL1";
private static final String M4 = "LONG_COL2";
private static final String TIME = "T";
- private static final String TABLE_NAME = "FOO";
- private static final String SEGMENT_NAME_1 = "FOO_SEGMENT_1";
- private static final String SEGMENT_NAME_2 = "FOO_SEGMENT_2";
- private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"TransformQueriesTest");
+ private static final int NUM_ROWS = 10;
- private Schema _schema;
- private TableConfig _tableConfig;
- private List<IndexSegment> _indexSegments = new ArrayList<>();
- private List<SegmentDataManager> _segmentDataManagers;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
- @BeforeClass
- public void setUp() {
- _schema =
- new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
- .addSingleValueDimension(M1,
FieldSpec.DataType.INT).addSingleValueDimension(M2, FieldSpec.DataType.INT)
- .addSingleValueDimension(M3,
FieldSpec.DataType.LONG).addSingleValueDimension(M4, FieldSpec.DataType.LONG)
- .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, TIME), null)
- .build();
- _tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME).build();
+ @Override
+ protected String getFilter() {
+ return "";
}
- @AfterClass
- public void tearDown() {
- FileUtils.deleteQuietly(INDEX_DIR);
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
}
- private void destroySegments() {
- for (IndexSegment indexSegment : _indexSegments) {
- if (indexSegment != null) {
- indexSegment.destroy();
- }
- }
- _indexSegments.clear();
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
}
- @Test
- public void testTransformWithAvgInnerSegment()
+ @BeforeClass
+ public void setUp()
throws Exception {
- try {
- final List<GenericRow> rows = createDataSet(10);
- try (final RecordReader recordReader = new GenericRowRecordReader(rows))
{
- createSegment(_tableConfig, _schema, recordReader, SEGMENT_NAME_1,
TABLE_NAME);
- final ImmutableSegment segment = loadSegment(SEGMENT_NAME_1);
- _indexSegments.add(segment);
-
- String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM foo";
- runAndVerifyInnerSegmentQuery(query, -10000.00, 10);
-
- query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM foo";
- runAndVerifyInnerSegmentQuery(query, 4990000.00, 10);
-
- query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM foo";
- runAndVerifyInnerSegmentQuery(query, 5000000.00, 10);
-
- query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM foo";
- runAndVerifyInnerSegmentQuery(query, 30000.00, 10);
-
- query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM foo";
- runAndVerifyInnerSegmentQuery(query, 5010000.00, 10);
-
- query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM foo";
- runAndVerifyInnerSegmentQuery(query, 15000000.00, 10);
-
- query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1,
LONG_COL2))) FROM foo";
- runAndVerifyInnerSegmentQuery(query, 10.00, 10);
-
- try {
- query = "SELECT AVG(SUB(INT_COL1, STRING_COL)) FROM foo";
- runAndVerifyInnerSegmentQuery(query, -10000.00, 10);
- Assert.fail("Query should have failed");
- } catch (Exception e) {
- }
-
- try {
- query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1,
STRING_COL))) FROM foo";
- runAndVerifyInnerSegmentQuery(query, 10.00, 10);
- Assert.fail("Query should have failed");
- } catch (Exception e) {
- }
- }
- } finally {
- destroySegments();
- }
+ FileUtils.deleteQuietly(INDEX_DIR);
+
+ buildSegment();
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
}
- @Test
- public void testTransformWithDateTruncInnerSegment()
+ protected void buildSegment()
throws Exception {
- Object[] columns = new Object[]{"Pinot", 1000, 2000, 500000, 1000000};
- long start = new DateTime(1973, 1, 8, 14, 6, 4, 3,
DateTimeZone.UTC).getMillis();
-
- List<GenericRow> rows = new ArrayList<>();
- for (int i = 0; i < 7; i++) {
- GenericRow row = new GenericRow();
- row.putField(D1, columns[0]);
- row.putField(M1, columns[1]);
- row.putField(M2, columns[2]);
- row.putField(M3, columns[3]);
- row.putField(M4, columns[4]);
- row.putField(TIME, ThreadLocalRandom.current().nextLong(start, start +
5000));
+ GenericRow row = new GenericRow();
+ row.putValue(D1, "Pinot");
+ row.putValue(M1, 1000);
+ row.putValue(M2, 2000);
+ row.putValue(M3, 500000);
+ row.putValue(M4, 1000000);
+ row.putValue(TIME, new DateTime(1973, 1, 8, 14, 6, 4, 3,
DateTimeZone.UTC).getMillis());
+
+ List<GenericRow> rows = new ArrayList<>(NUM_ROWS);
+ for (int i = 0; i < NUM_ROWS; i++) {
rows.add(row);
}
- try (final RecordReader recordReader = new GenericRowRecordReader(rows)) {
- createSegment(_tableConfig, _schema, recordReader, SEGMENT_NAME_1,
TABLE_NAME);
- final ImmutableSegment segment = loadSegment(SEGMENT_NAME_1);
- _indexSegments.add(segment);
-
- String query =
- "SELECT COUNT(*) FROM foo GROUP BY DATETRUNC('week', ADD(SUB(DIV(T,
1000), INT_COL2), INT_COL2), 'SECONDS', 'Europe/Berlin')";
- verifyDateTruncationResult(query, rows.size(), "95295600");
-
- query =
- "SELECT COUNT(*) FROM foo GROUP BY DATETRUNC('week',
DIV(MULT(DIV(ADD(SUB(T, 5), 5), 1000), INT_COL2), INT_COL2), 'SECONDS',
'Europe/Berlin', 'MILLISECONDS')";
- verifyDateTruncationResult(query, rows.size(), "95295600000");
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME).build();
+ Schema schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
+ .addSingleValueDimension(M1,
FieldSpec.DataType.INT).addSingleValueDimension(M2, FieldSpec.DataType.INT)
+ .addSingleValueDimension(M3,
FieldSpec.DataType.LONG).addSingleValueDimension(M4, FieldSpec.DataType.LONG)
+ .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, TIME), null).build();
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig,
schema);
+ config.setOutDir(INDEX_DIR.getPath());
+ config.setTableName(TABLE_NAME);
+ config.setSegmentName(SEGMENT_NAME);
- query = "SELECT COUNT(*) FROM foo GROUP BY DATETRUNC('quarter', T,
'MILLISECONDS')";
- verifyDateTruncationResult(query, rows.size(), "94694400000");
- } finally {
- destroySegments();
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
+ driver.init(config, recordReader);
+ driver.build();
}
}
- private void verifyDateTruncationResult(String query, long countResult,
String stringKey) {
- AggregationGroupByOperator aggregtionGroupByOperator =
getOperatorForQuery(query);
- IntermediateResultsBlock resultsBlock =
aggregtionGroupByOperator.nextBlock();
- final AggregationGroupByResult aggregationGroupByResult =
resultsBlock.getAggregationGroupByResult();
- List<GroupKeyGenerator.GroupKey> groupKeys =
ImmutableList.copyOf(aggregationGroupByResult.getGroupKeyIterator());
- Assert.assertEquals(groupKeys.size(), 1);
- Assert.assertEquals(groupKeys.get(0)._stringKey, stringKey);
- Object resultForKey =
aggregationGroupByResult.getResultForKey(groupKeys.get(0), 0);
- Assert.assertEquals(resultForKey, countResult);
+ @AfterClass
+ public void tearDown() {
+ _indexSegment.destroy();
+ FileUtils.deleteQuietly(INDEX_DIR);
}
@Test
- public void testTransformWithAvgInterSegmentInterServer()
- throws Exception {
- try {
- final List<GenericRow> segmentOneRows = createDataSet(10);
- final List<GenericRow> segmentTwoRows = createDataSet(10);
-
- try (final RecordReader recordReaderOne = new
GenericRowRecordReader(segmentOneRows);
- final RecordReader recordReaderTwo = new
GenericRowRecordReader(segmentTwoRows)) {
- createSegment(_tableConfig, _schema, recordReaderOne, SEGMENT_NAME_1,
TABLE_NAME);
- createSegment(_tableConfig, _schema, recordReaderTwo, SEGMENT_NAME_2,
TABLE_NAME);
-
- final ImmutableSegment segmentOne = loadSegment(SEGMENT_NAME_1);
- final ImmutableSegment segmentTwo = loadSegment(SEGMENT_NAME_2);
-
- _indexSegments.add(segmentOne);
- _indexSegments.add(segmentTwo);
+ public void testTransformWithAvgInnerSegment() {
+ String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM testTable";
+ runAndVerifyInnerSegmentQuery(query, -10000.0, 10);
- _segmentDataManagers =
- Arrays.asList(new ImmutableSegmentDataManager(segmentOne), new
ImmutableSegmentDataManager(segmentTwo));
+ query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM testTable";
+ runAndVerifyInnerSegmentQuery(query, 4990000.0, 10);
- String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM foo";
- runAndVerifyInterSegmentQuery(query, "-1000.00000");
+ query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM testTable";
+ runAndVerifyInnerSegmentQuery(query, 5000000.0, 10);
- query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM foo";
- runAndVerifyInterSegmentQuery(query, "499000.00000");
+ query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM testTable";
+ runAndVerifyInnerSegmentQuery(query, 30000.0, 10);
- query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM foo";
- runAndVerifyInterSegmentQuery(query, "500000.00000");
+ query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM testTable";
+ runAndVerifyInnerSegmentQuery(query, 5010000.0, 10);
- query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM foo";
- runAndVerifyInterSegmentQuery(query, "3000.00000");
+ query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM testTable";
+ runAndVerifyInnerSegmentQuery(query, 15000000.0, 10);
- query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM foo";
- runAndVerifyInterSegmentQuery(query, "501000.00000");
+ query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1,
LONG_COL2))) FROM testTable";
+ runAndVerifyInnerSegmentQuery(query, 10.0, 10);
- query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM foo";
- runAndVerifyInterSegmentQuery(query, "1500000.00000");
-
- query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1,
LONG_COL2))) FROM foo";
- runAndVerifyInterSegmentQuery(query, "1.00000");
- }
- } finally {
- destroySegments();
+ try {
+ query = "SELECT AVG(SUB(INT_COL1, STRING_COL)) FROM testTable";
+ runAndVerifyInnerSegmentQuery(query, -10000.0, 10);
+ Assert.fail("Query should have failed");
+ } catch (Exception e) {
+ // Expected
}
- }
- private List<GenericRow> createDataSet(final int numRows) {
- final ThreadLocalRandom random = ThreadLocalRandom.current();
- final List<GenericRow> rows = new ArrayList<>(numRows);
- Object[] columns;
-
- // ROW
- GenericRow row = new GenericRow();
- columns = new Object[]{"Pinot", 1000, 2000, 500000, 1000000};
- row.putField(D1, columns[0]);
- row.putField(M1, columns[1]);
- row.putField(M2, columns[2]);
- row.putField(M3, columns[3]);
- row.putField(M4, columns[4]);
- row.putField(TIME, random.nextLong(TimeUtils.getValidMinTimeMillis(),
TimeUtils.getValidMaxTimeMillis()));
-
- for (int i = 0; i < numRows; i++) {
- rows.add(row);
+ try {
+ query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1,
STRING_COL))) FROM testTable";
+ runAndVerifyInnerSegmentQuery(query, 10.00, 10);
+ Assert.fail("Query should have failed");
+ } catch (Exception e) {
+ // Expected
}
- return rows;
}
- private void runAndVerifyInnerSegmentQuery(String query, double sum, long
count) {
+ private void runAndVerifyInnerSegmentQuery(String query, double expectedSum,
int expectedCount) {
AggregationOperator aggregationOperator = getOperatorForQuery(query);
IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
- final List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+ assertNotNull(aggregationResult);
+ assertEquals(aggregationResult.size(), 1);
AvgPair avgPair = (AvgPair) aggregationResult.get(0);
- Assert.assertEquals(avgPair.getSum(), sum);
- Assert.assertEquals(avgPair.getCount(), count);
+ assertEquals(avgPair.getSum(), expectedSum);
+ assertEquals(avgPair.getCount(), expectedCount);
}
- private void runAndVerifyInterSegmentQuery(String query, String serialized) {
- BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
- List<AggregationResult> aggregationResults =
brokerResponse.getAggregationResults();
- Assert.assertEquals(aggregationResults.size(), 1);
- Serializable value = aggregationResults.get(0).getValue();
- Assert.assertEquals(value.toString(), serialized);
- }
+ @Test
+ public void testTransformWithDateTruncInnerSegment() {
+ String query =
+ "SELECT COUNT(*) FROM testTable GROUP BY DATETRUNC('week',
ADD(SUB(DIV(T, 1000), INT_COL2), INT_COL2), 'SECONDS', 'Europe/Berlin')";
+ verifyDateTruncationResult(query, "95295600");
- @Override
- protected String getFilter() {
- return "";
- }
+ query =
+ "SELECT COUNT(*) FROM testTable GROUP BY DATETRUNC('week',
DIV(MULT(DIV(ADD(SUB(T, 5), 5), 1000), INT_COL2), INT_COL2), 'SECONDS',
'Europe/Berlin', 'MILLISECONDS')";
+ verifyDateTruncationResult(query, "95295600000");
- @Override
- protected IndexSegment getIndexSegment() {
- return _indexSegments.get(0);
+ query = "SELECT COUNT(*) FROM testTable GROUP BY DATETRUNC('quarter', T,
'MILLISECONDS')";
+ verifyDateTruncationResult(query, "94694400000");
}
- @Override
- protected List<SegmentDataManager> getSegmentDataManagers() {
- return _segmentDataManagers;
+ private void verifyDateTruncationResult(String query, String
expectedStringKey) {
+ AggregationGroupByOperator aggregationGroupByOperator =
getOperatorForQuery(query);
+ IntermediateResultsBlock resultsBlock =
aggregationGroupByOperator.nextBlock();
+ AggregationGroupByResult aggregationGroupByResult =
resultsBlock.getAggregationGroupByResult();
+ assertNotNull(aggregationGroupByResult);
+ List<GroupKeyGenerator.GroupKey> groupKeys =
ImmutableList.copyOf(aggregationGroupByResult.getGroupKeyIterator());
+ assertEquals(groupKeys.size(), 1);
+ assertEquals(groupKeys.get(0)._stringKey, expectedStringKey);
+ Object resultForKey =
aggregationGroupByResult.getResultForKey(groupKeys.get(0), 0);
+ assertEquals(resultForKey, (long) NUM_ROWS);
}
- private void createSegment(TableConfig tableConfig, Schema schema,
RecordReader recordReader, String segmentName,
- String tableName)
- throws Exception {
- SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
- segmentGeneratorConfig.setTableName(tableName);
- segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
- segmentGeneratorConfig.setSegmentName(segmentName);
+ @Test
+ public void testTransformWithAvgInterSegmentInterServer() {
+ String query = "SELECT AVG(SUB(INT_COL1, INT_COL2)) FROM testTable";
+ runAndVerifyInterSegmentQuery(query, "-1000.00000");
- SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(segmentGeneratorConfig, recordReader);
- driver.build();
+ query = "SELECT AVG(SUB(LONG_COL1, INT_COL1)) FROM testTable";
+ runAndVerifyInterSegmentQuery(query, "499000.00000");
- File segmentIndexDir = new File(INDEX_DIR.getAbsolutePath(), segmentName);
- if (!segmentIndexDir.exists()) {
- throw new IllegalStateException("Segment generation failed");
- }
+ query = "SELECT AVG(SUB(LONG_COL2, LONG_COL1)) FROM testTable";
+ runAndVerifyInterSegmentQuery(query, "500000.00000");
+
+ query = "SELECT AVG(ADD(INT_COL1, INT_COL2)) FROM testTable";
+ runAndVerifyInterSegmentQuery(query, "3000.00000");
+
+ query = "SELECT AVG(ADD(INT_COL1, LONG_COL1)) FROM testTable";
+ runAndVerifyInterSegmentQuery(query, "501000.00000");
+
+ query = "SELECT AVG(ADD(LONG_COL1, LONG_COL2)) FROM testTable";
+ runAndVerifyInterSegmentQuery(query, "1500000.00000");
+
+ query = "SELECT AVG(ADD(DIV(INT_COL1, INT_COL2), DIV(LONG_COL1,
LONG_COL2))) FROM testTable";
+ runAndVerifyInterSegmentQuery(query, "1.00000");
}
- private ImmutableSegment loadSegment(String segmentName)
- throws Exception {
- return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName),
ReadMode.heap);
+ private void runAndVerifyInterSegmentQuery(String query, String
expectedValue) {
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ List<AggregationResult> aggregationResults =
brokerResponse.getAggregationResults();
+ assertEquals(aggregationResults.size(), 1);
+ Serializable value = aggregationResults.get(0).getValue();
+ assertEquals(value, expectedValue);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]