This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1f69041 [Part 2] Add geo support - add a geo aggregate function
st_union (#5744)
1f69041 is described below
commit 1f6904152bc80569ab6e4e90251ade344d72c14f
Author: Yupeng Fu <[email protected]>
AuthorDate: Wed Jul 29 17:04:26 2020 -0700
[Part 2] Add geo support - add a geo aggregate function st_union (#5744)
Added a new aggregate function that unions a set of geometry objects and
returns a multi-geometry object.
---
.../common/function/AggregationFunctionType.java | 3 +
.../apache/pinot/core/common/ObjectSerDeUtils.java | 29 ++-
.../pinot/core/geospatial/GeometryUtils.java | 2 +
.../function/AggregationFunctionFactory.java | 2 +
.../function/AggregationFunctionVisitorBase.java | 4 +
.../function/StUnionAggregationFunction.java | 140 +++++++++++
.../apache/pinot/queries/StUnionQueriesTest.java | 258 +++++++++++++++++++++
7 files changed, 436 insertions(+), 2 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
index ff3fb50..5125a8d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/AggregationFunctionType.java
@@ -36,6 +36,9 @@ public enum AggregationFunctionType {
PERCENTILEEST("percentileEst"),
PERCENTILETDIGEST("percentileTDigest"),
+ // geo aggregation functions
+ ST_UNION("ST_Union"),
+
// Aggregation functions for multi-valued columns
COUNTMV("countMV"),
MINMV("minMV"),
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index f471e37..637d5e1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -36,10 +36,12 @@ import java.util.Map;
import org.apache.datasketches.memory.Memory;
import org.apache.datasketches.theta.Sketch;
import org.apache.pinot.common.utils.StringUtil;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
import
org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
import
org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
import
org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
+import org.locationtech.jts.geom.Geometry;
/**
@@ -63,7 +65,8 @@ public class ObjectSerDeUtils {
IntSet(9),
TDigest(10),
DistinctTable(11),
- DataSketch(12);
+ DataSketch(12),
+ Geometry(13);
private int _value;
@@ -102,6 +105,8 @@ public class ObjectSerDeUtils {
return ObjectType.DistinctTable;
} else if (value instanceof Sketch) {
return ObjectType.DataSketch;
+ } else if (value instanceof Geometry) {
+ return ObjectType.Geometry;
} else {
throw new IllegalArgumentException("Unsupported type of value: " +
value.getClass().getSimpleName());
}
@@ -482,6 +487,25 @@ public class ObjectSerDeUtils {
}
};
+ public static final ObjectSerDe<Geometry> GEOMETRY_SER_DE = new
ObjectSerDe<Geometry>() {
+ @Override
+ public byte[] serialize(Geometry value) {
+ return GeometrySerializer.serialize(value);
+ }
+
+ @Override
+ public Geometry deserialize(byte[] bytes) {
+ return GeometrySerializer.deserialize(bytes);
+ }
+
+ @Override
+ public Geometry deserialize(ByteBuffer byteBuffer) {
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return GeometrySerializer.deserialize(bytes);
+ }
+ };
+
// NOTE: DO NOT change the order, it has to be the same order as the
ObjectType
//@formatter:off
private static final ObjectSerDe[] SER_DES = {
@@ -497,7 +521,8 @@ public class ObjectSerDeUtils {
INT_SET_SER_DE,
TDIGEST_SER_DE,
DISTINCT_TABLE_SER_DE,
- DATA_SKETCH_SER_DE
+ DATA_SKETCH_SER_DE,
+ GEOMETRY_SER_DE
};
//@formatter:on
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java
index 0d0cc97..309d4dd 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/geospatial/GeometryUtils.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.geospatial;
import com.google.common.base.Joiner;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.Point;
import org.locationtech.jts.geom.PrecisionModel;
@@ -39,6 +40,7 @@ public class GeometryUtils {
public static final double EARTH_RADIUS_KM = 6371.01;
public static final double EARTH_RADIUS_M = EARTH_RADIUS_KM * 1000.0;
public static final Joiner OR_JOINER = Joiner.on(" or ");
+ public static final Geometry EMPTY_POINT = GEOMETRY_FACTORY.createPoint();
private GeometryUtils() {
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index 021c188..bfe8550 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -152,6 +152,8 @@ public class AggregationFunctionFactory {
case DISTINCT:
return new DistinctAggregationFunction(arguments,
queryContext.getOrderByExpressions(),
queryContext.getLimit());
+ case ST_UNION:
+ return new StUnionAggregationFunction(firstArgument);
default:
throw new IllegalArgumentException();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
index 2b5b615..8e0a8a6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionVisitorBase.java
@@ -98,5 +98,9 @@ public class AggregationFunctionVisitorBase {
public void visit(DistinctCountThetaSketchAggregationFunction function) {
}
+
+ public void visit(StUnionAggregationFunction function) {
+
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
new file mode 100644
index 0000000..8ae1d97
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/StUnionAggregationFunction.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function;
+
+import java.util.Map;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.geospatial.GeometryUtils;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import
org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.locationtech.jts.geom.Geometry;
+
+
+public class StUnionAggregationFunction extends
BaseSingleInputAggregationFunction<Geometry, ByteArray> {
+
+ /**
+ * Constructor for the class.
+ *
+ * @param expression Expression to aggregate on.
+ */
+ public StUnionAggregationFunction(ExpressionContext expression) {
+ super(expression);
+ }
+
+ @Override
+ public AggregationFunctionType getType() {
+ return AggregationFunctionType.ST_UNION;
+ }
+
+ @Override
+ public void accept(AggregationFunctionVisitorBase visitor) {
+ visitor.visit(this);
+ }
+
+ @Override
+ public AggregationResultHolder createAggregationResultHolder() {
+ return new ObjectAggregationResultHolder();
+ }
+
+ @Override
+ public GroupByResultHolder createGroupByResultHolder(int initialCapacity,
int maxCapacity) {
+ return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public void aggregate(int length, AggregationResultHolder
aggregationResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV();
+ Geometry geometry = aggregationResultHolder.getResult();
+ for (int i = 0; i < length; i++) {
+ Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
+ geometry = geometry == null ? value : geometry.union(value);
+ }
+ aggregationResultHolder.setValue(geometry);
+ }
+
+ @Override
+ public void aggregateGroupBySV(int length, int[] groupKeyArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ int groupKey = groupKeyArray[i];
+ Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
+ Geometry geometry = groupByResultHolder.getResult(groupKey);
+ groupByResultHolder.setValueForKey(groupKey, geometry == null ? value :
geometry.union(value));
+ }
+ }
+
+ @Override
+ public void aggregateGroupByMV(int length, int[][] groupKeysArray,
GroupByResultHolder groupByResultHolder,
+ Map<ExpressionContext, BlockValSet> blockValSetMap) {
+ byte[][] bytesArray = blockValSetMap.get(_expression).getBytesValuesSV();
+ for (int i = 0; i < length; i++) {
+ Geometry value = GeometrySerializer.deserialize(bytesArray[i]);
+ for (int groupKey : groupKeysArray[i]) {
+ Geometry geometry = groupByResultHolder.getResult(groupKey);
+ groupByResultHolder.setValueForKey(groupKey, geometry == null ? value
: geometry.union(value));
+ }
+ }
+ }
+
+ @Override
+ public Geometry extractAggregationResult(AggregationResultHolder
aggregationResultHolder) {
+ Geometry geometry = aggregationResultHolder.getResult();
+ return geometry == null ? GeometryUtils.EMPTY_POINT : geometry;
+ }
+
+ @Override
+ public Geometry extractGroupByResult(GroupByResultHolder
groupByResultHolder, int groupKey) {
+ Geometry geometry = groupByResultHolder.getResult(groupKey);
+ return geometry == null ? GeometryUtils.EMPTY_POINT : geometry;
+ }
+
+ @Override
+ public Geometry merge(Geometry intermediateResult1, Geometry
intermediateResult2) {
+ return intermediateResult1.union(intermediateResult2);
+ }
+
+ @Override
+ public boolean isIntermediateResultComparable() {
+ return false;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+ return DataSchema.ColumnDataType.OBJECT;
+ }
+
+ @Override
+ public DataSchema.ColumnDataType getFinalResultColumnType() {
+ return DataSchema.ColumnDataType.BYTES;
+ }
+
+ @Override
+ public ByteArray extractFinalResult(Geometry geometry) {
+ return new ByteArray(GeometrySerializer.serialize(geometry));
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
new file mode 100644
index 0000000..90b60fb
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/StUnionQueriesTest.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.segment.ReadMode;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.geospatial.GeometryUtils;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
+import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
+import org.apache.pinot.core.operator.query.AggregationOperator;
+import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ByteArray;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.Point;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.AssertJUnit.assertNotNull;
+
+
+/**
+ * Queries test for ST_UNION queries.
+ */
+public class StUnionQueriesTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(),
"StUnionQueriesTest");
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME = "testSegment";
+ private static final Random RANDOM = new Random();
+
+ private static final int NUM_RECORDS = 200;
+ private static final int MAX_VALUE = 100000;
+
+ private static final String POINT_COLUMN = "pointColumn";
+ private static final String INT_COLUMN = "intColumn";
+ private static final Schema SCHEMA =
+ new Schema.SchemaBuilder().addSingleValueDimension(POINT_COLUMN,
FieldSpec.DataType.BYTES)
+ .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT).build();
+ private static final TableConfig TABLE_CONFIG =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+
+ private Map<Integer, Geometry> _values;
+ private Geometry _intermediateResult;
+ private byte[] _expectedResults;
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @Override
+ protected String getFilter() {
+ return "";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+ int hashMapCapacity = HashUtil.getHashMapCapacity(MAX_VALUE);
+ _values = new HashMap<>(hashMapCapacity);
+ List<GenericRow> records = new ArrayList<>(NUM_RECORDS);
+ for (int i = 0; i < NUM_RECORDS; i++) {
+ GenericRow record = new GenericRow();
+
+ int x = RANDOM.nextInt(MAX_VALUE);
+ int y = RANDOM.nextInt(MAX_VALUE);
+ Point point = GeometryUtils.GEOMETRY_FACTORY.createPoint(new
Coordinate(x, y));
+ byte[] pointBytes = GeometrySerializer.serialize(point);
+ _intermediateResult = _intermediateResult == null ? point :
point.union(_intermediateResult);
+ record.putValue(POINT_COLUMN, pointBytes);
+
+ int value = RANDOM.nextInt(MAX_VALUE);
+ record.putValue(INT_COLUMN, value);
+ int key = Integer.hashCode(value);
+ _values.put(key, _values.containsKey(key) ?
_values.get(key).union(point) : point);
+ records.add(record);
+ }
+ _expectedResults = GeometrySerializer.serialize(_intermediateResult);
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+ driver.build();
+
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new
File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+ }
+
+ @Test
+ public void testAggregationOnly() {
+ String query = "SELECT ST_UNION(pointColumn) FROM testTable";
+
+ // Inner segment
+ Operator operator = getOperatorForPqlQuery(query);
+ assertTrue(operator instanceof AggregationOperator);
+ IntermediateResultsBlock resultsBlock = ((AggregationOperator)
operator).nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, NUM_RECORDS,
+ NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+
+ assertNotNull(aggregationResult);
+
+ assertEquals(aggregationResult.get(0), _intermediateResult);
+
+ // Inter segments
+ String[] expectedResults = new String[1];
+ expectedResults[0] = new ByteArray(_expectedResults).toHexString();
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ QueriesTestUtils
+ .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0,
4 * NUM_RECORDS, 4 * NUM_RECORDS,
+ expectedResults);
+ brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
+ QueriesTestUtils
+ .testInterSegmentAggregationResult(brokerResponse, 4 * NUM_RECORDS, 0,
4 * NUM_RECORDS, 4 * NUM_RECORDS,
+ expectedResults);
+ }
+
+ @Test
+ public void testAggregationOnlyOnEmptyResultSet() {
+ String query = "SELECT ST_UNION(pointColumn) FROM testTable where
intColumn=-1";
+
+ // Inner segment
+ Operator operator = getOperatorForPqlQuery(query);
+ assertTrue(operator instanceof AggregationOperator);
+ IntermediateResultsBlock resultsBlock = ((AggregationOperator)
operator).nextBlock();
+
QueriesTestUtils.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
0, 0, 0, NUM_RECORDS);
+ List<Object> aggregationResult = resultsBlock.getAggregationResult();
+
+ assertNotNull(aggregationResult);
+
+ assertEquals(aggregationResult.get(0), GeometryUtils.EMPTY_POINT);
+
+ // Inter segments
+ String[] expectedResults = new String[1];
+ expectedResults[0] = new
ByteArray(GeometrySerializer.serialize(GeometryUtils.EMPTY_POINT)).toHexString();
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0,
0, 4 * NUM_RECORDS, expectedResults);
+ brokerResponse = getBrokerResponseForPqlQueryWithFilter(query);
+ QueriesTestUtils.testInterSegmentAggregationResult(brokerResponse, 0, 0,
0, 4 * NUM_RECORDS, expectedResults);
+ }
+
+ @Test
+ public void testAggregationGroupBy() {
+ String query = "SELECT ST_UNION(pointColumn) FROM testTable GROUP BY
intColumn";
+
+ // Inner segment
+ Operator operator = getOperatorForPqlQuery(query);
+ assertTrue(operator instanceof AggregationGroupByOperator);
+ IntermediateResultsBlock resultsBlock = ((AggregationGroupByOperator)
operator).nextBlock();
+ QueriesTestUtils
+
.testInnerSegmentExecutionStatistics(operator.getExecutionStatistics(),
NUM_RECORDS, 0, 2 * NUM_RECORDS,
+ NUM_RECORDS);
+ AggregationGroupByResult aggregationGroupByResult =
resultsBlock.getAggregationGroupByResult();
+ assertNotNull(aggregationGroupByResult);
+ int numGroups = 0;
+ Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator =
aggregationGroupByResult.getGroupKeyIterator();
+ while (groupKeyIterator.hasNext()) {
+ numGroups++;
+ GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+ assertTrue(_values.containsKey(Integer.parseInt(groupKey._stringKey)));
+ }
+ assertEquals(numGroups, _values.size());
+
+ // Inter segments
+ BrokerResponseNative brokerResponse = getBrokerResponseForPqlQuery(query);
+ Assert.assertEquals(brokerResponse.getNumDocsScanned(), 4 * NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), 0);
+ Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), 4 * 2
* NUM_RECORDS);
+ Assert.assertEquals(brokerResponse.getTotalDocs(), 4 * NUM_RECORDS);
+ // size of this array will be equal to number of aggregation functions
since
+ // we return each aggregation function separately
+ List<AggregationResult> aggregationResults =
brokerResponse.getAggregationResults();
+ int numAggregationColumns = aggregationResults.size();
+ Assert.assertEquals(numAggregationColumns, 1);
+ for (AggregationResult aggregationResult : aggregationResults) {
+ Assert.assertNull(aggregationResult.getValue());
+ List<GroupByResult> groupByResults =
aggregationResult.getGroupByResult();
+ numGroups = groupByResults.size();
+ for (int i = 0; i < numGroups; i++) {
+ GroupByResult groupByResult = groupByResults.get(i);
+ List<String> group = groupByResult.getGroup();
+ assertEquals(group.size(), 1);
+ int key = Integer.parseInt(group.get(0));
+ assertTrue(_values.containsKey(key));
+ assertEquals(groupByResult.getValue(),
+ new
ByteArray(GeometrySerializer.serialize(_values.get(key))).toHexString());
+ }
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ FileUtils.deleteDirectory(INDEX_DIR);
+ _indexSegment.destroy();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]