This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1868004 [ASTERIXDB-2796][RT] Fix IndexOutOfBoundsException in hash
group-by
1868004 is described below
commit 1868004c0a21edb6dfb5e8cd3f900d7c2acce385
Author: Ali Alsuliman <[email protected]>
AuthorDate: Fri Nov 6 12:14:26 2020 -0800
[ASTERIXDB-2796][RT] Fix IndexOutOfBoundsException in hash group-by
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
IndexOutOfBoundsException happens in hash group-by when WITH
clause exists. The variables from the WITH clause get propagated
through the decor variables of the group-by. The number of output
keys are the sum of both the group-by keys and decor variables, but
the hash functions & comparators used are only for the group-by keys.
- pass only the aggregated keys to the comparators and hash functions.
Change-Id: I19dc652872a1f030b6afa509711e7e0700e86856
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/8743
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Dmitry Lychagin <[email protected]>
---
.../hash-group-by-decor.01.ddl.sqlpp | 31 +++++++++++++
.../hash-group-by-decor.02.update.sqlpp | 30 ++++++++++++
.../hash-group-by-decor.03.query.sqlpp | 28 +++++++++++
.../hash-group-by-decor/hash-group-by-decor.03.adm | 3 ++
.../test/resources/runtimets/testsuite_sqlpp.xml | 5 ++
.../physical/ExternalGroupByPOperator.java | 19 ++------
.../core/rewriter/base/AbstractRuleController.java | 6 +--
.../core/rewriter/base/HeuristicOptimizer.java | 2 +-
.../rules/PushMapOperatorThroughUnionRule.java | 4 +-
.../apache/hyracks/api/exceptions/ErrorCode.java | 2 +-
.../src/main/resources/errormsg/en.properties | 4 +-
.../std/group/HashSpillableTableFactory.java | 47 +++++++++++++------
.../dataflow/std/group/ISpillableTableFactory.java | 8 ++--
.../ExternalGroupBuildOperatorNodePushable.java | 16 +++++--
.../external/ExternalGroupOperatorDescriptor.java | 24 ++++------
.../ExternalGroupWriteOperatorNodePushable.java | 40 ++++++++++------
.../ExternalGroupOperatorDescriptorTest.java | 4 +-
.../hyracks/tests/integration/AggregationTest.java | 54 +++++++++++-----------
.../integration/LocalityAwareConnectorTest.java | 4 +-
.../tests/unit/ExternalHashGroupbyTest.java | 7 +--
.../examples/text/client/WordCountMain.java | 2 +-
.../hyracks/examples/tpch/client/Groupby.java | 2 +-
.../apache/hyracks/examples/tpch/client/Join.java | 2 +-
23 files changed, 235 insertions(+), 109 deletions(-)
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.01.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.01.ddl.sqlpp
new file mode 100644
index 0000000..7d2c396
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.01.ddl.sqlpp
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/*
+ * Description : Testing that runtime of hash-group by handles decor variables
+ * Expected : SUCCESS
+ */
+
+DROP DATAVERSE test IF EXISTS;
+CREATE DATAVERSE test;
+
+USE test;
+
+CREATE TYPE t1 AS {id: int};
+CREATE DATASET ds1(t1) PRIMARY KEY id;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.02.update.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.02.update.sqlpp
new file mode 100644
index 0000000..e79903f
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.02.update.sqlpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+INSERT INTO ds1 {"id": 1, "dept_id": 1, "salary": 10, "area": "N"};
+INSERT INTO ds1 {"id": 2, "dept_id": 1, "salary": 10, "area": "N"};
+INSERT INTO ds1 {"id": 3, "dept_id": 1, "salary": 10, "area": "A"};
+INSERT INTO ds1 {"id": 4, "dept_id": 2, "salary": 20, "area": "N"};
+INSERT INTO ds1 {"id": 5, "dept_id": 2, "salary": 20, "area": "N"};
+INSERT INTO ds1 {"id": 6, "dept_id": 2, "salary": 20, "area": "A"};
+INSERT INTO ds1 {"id": 7, "dept_id": 3, "salary": 30, "area": "N"};
+INSERT INTO ds1 {"id": 8, "dept_id": 3, "salary": 30, "area": "N"};
+INSERT INTO ds1 {"id": 9, "dept_id": 3, "salary": 30, "area": "A"};
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.03.query.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.03.query.sqlpp
new file mode 100644
index 0000000..39cd32f
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/group-by/hash-group-by-decor/hash-group-by-decor.03.query.sqlpp
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+USE test;
+
+WITH a AS "N"
+FROM ds1 v
+WHERE v.area = a
+/*+ hash */
+GROUP BY v.dept_id
+SELECT v.dept_id, SUM(v.salary) AS total, a
+ORDER BY v.dept_id;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/hash-group-by-decor/hash-group-by-decor.03.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/hash-group-by-decor/hash-group-by-decor.03.adm
new file mode 100644
index 0000000..c79ed9a
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/group-by/hash-group-by-decor/hash-group-by-decor.03.adm
@@ -0,0 +1,3 @@
+{ "a": "N", "dept_id": 1, "total": 20 }
+{ "a": "N", "dept_id": 2, "total": 40 }
+{ "a": "N", "dept_id": 3, "total": 60 }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index 6d56e70..adbdf69 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -5808,6 +5808,11 @@
<output-dir compare="Text">group-by-all-ASTERIXDB-2611</output-dir>
</compilation-unit>
</test-case>
+ <test-case FilePath="group-by">
+ <compilation-unit name="hash-group-by-decor">
+ <output-dir compare="Text">hash-group-by-decor</output-dir>
+ </compilation-unit>
+ </test-case>
</test-group>
<test-group name="index-join">
<test-case FilePath="index-join">
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
index 945e36c..7515258 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/ExternalGroupByPOperator.java
@@ -123,8 +123,8 @@ public class ExternalGroupByPOperator extends
AbstractGroupByPOperator {
GroupByOperator gby = (GroupByOperator) op;
checkGroupAll(gby);
List<LogicalVariable> gbyCols = getGroupByColumns();
- int keys[] = JobGenHelper.variablesToFieldIndexes(gbyCols,
inputSchemas[0]);
- int fdColumns[] = getFdColumns(gby, inputSchemas[0]);
+ int[] gbyColumns = JobGenHelper.variablesToFieldIndexes(gbyCols,
inputSchemas[0]);
+ int[] fdColumns = getFdColumns(gby, inputSchemas[0]);
if (gby.getNestedPlans().size() != 1) {
throw new AlgebricksException(
@@ -161,14 +161,6 @@ public class ExternalGroupByPOperator extends
AbstractGroupByPOperator {
.add(partialAggregationTypeComputer.getType(aggFun,
aggOpInputEnv, context.getMetadataProvider()));
}
- int[] keyAndDecFields = new int[keys.length + fdColumns.length];
- for (i = 0; i < keys.length; ++i) {
- keyAndDecFields[i] = keys[i];
- }
- for (i = 0; i < fdColumns.length; i++) {
- keyAndDecFields[keys.length + i] = fdColumns[i];
- }
-
List<LogicalVariable> keyAndDecVariables = new
ArrayList<LogicalVariable>();
for (Pair<LogicalVariable, Mutable<ILogicalExpression>> p :
gby.getGroupByList()) {
keyAndDecVariables.add(p.first);
@@ -225,15 +217,14 @@ public class ExternalGroupByPOperator extends
AbstractGroupByPOperator {
// Calculates the hash table size (# of unique hash values) based on
the budget and a tuple size.
int frameSize = context.getFrameSize();
long memoryBudgetInBytes =
localMemoryRequirements.getMemoryBudgetInBytes(frameSize);
- int numFds = gby.getDecorList().size();
- int groupByColumnsCount = gby.getGroupByList().size() + numFds;
+ int allColumns = gbyColumns.length + fdColumns.length;
int hashTableSize =
ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(memoryBudgetInBytes,
- groupByColumnsCount, frameSize);
+ allColumns, frameSize);
int framesLimit = localMemoryRequirements.getMemoryBudgetInFrames();
long inputSize = framesLimit * (long) frameSize;
ExternalGroupOperatorDescriptor gbyOpDesc = new
ExternalGroupOperatorDescriptor(spec, hashTableSize, inputSize,
- keyAndDecFields, framesLimit, comparatorFactories,
normalizedKeyFactory, aggregatorFactory,
+ gbyColumns, fdColumns, framesLimit, comparatorFactories,
normalizedKeyFactory, aggregatorFactory,
mergeFactory, recordDescriptor, recordDescriptor, new
HashSpillableTableFactory(hashFunctionFactories));
gbyOpDesc.setSourceLocation(gby.getSourceLocation());
contributeOpDesc(builder, gby, gbyOpDesc);
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
index 4287b26..0261106 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AbstractRuleController.java
@@ -146,7 +146,7 @@ public abstract class AbstractRuleController {
try {
context.getPlanStabilityVerifier().recordPlanSignature(opRef);
} catch (AlgebricksException e) {
- throw
AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
String.format("Illegal state before rule %s. %s",
rule.getClass().getName(), e.getMessage()));
}
}
@@ -157,7 +157,7 @@ public abstract class AbstractRuleController {
try {
context.getPlanStructureVerifier().verifyPlanStructure(opRef);
} catch (AlgebricksException e) {
- throw
AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
String.format("Fired rule %s produced illegal %s",
rule.getClass().getName(), e.getMessage()));
}
} else {
@@ -168,7 +168,7 @@ public abstract class AbstractRuleController {
printRuleApplication(rule, "not fired, but failed sanity
check: " + e.getMessage(), beforePlan,
getPlanString(opRef));
}
- throw
AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
String.format("Non-fired rule %s unexpectedly %s",
rule.getClass().getName(), e.getMessage()));
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
index ab06488..f93d1f9 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/HeuristicOptimizer.java
@@ -128,7 +128,7 @@ public class HeuristicOptimizer {
context.getPlanStructureVerifier().verifyPlanStructure(opRef);
}
} catch (AlgebricksException e) {
- throw
AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
String.format("Initial plan contains illegal %s",
e.getMessage()));
}
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
index 3c468be..32dca00 100644
---
a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
+++
b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PushMapOperatorThroughUnionRule.java
@@ -126,12 +126,12 @@ public abstract class PushMapOperatorThroughUnionRule
implements IAlgebraicRewri
for (LogicalVariable opProducedVar : opProducedVars) {
LogicalVariable leftBranchProducedVar =
leftBranchProducedVarMap.get(opProducedVar);
if (leftBranchProducedVar == null) {
- throw
AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
op.getSourceLocation(),
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
op.getSourceLocation(),
"Cannot find " + opProducedVar);
}
LogicalVariable rightBranchProducedVar =
rightBranchProducedVarMap.get(opProducedVar);
if (rightBranchProducedVar == null) {
- throw
AlgebricksException.create(ErrorCode.COMPILATION_ILLEGAL_STATE,
op.getSourceLocation(),
+ throw AlgebricksException.create(ErrorCode.ILLEGAL_STATE,
op.getSourceLocation(),
"Cannot find " + opProducedVar);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index a93da48..db60f1a 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -159,6 +159,7 @@ public class ErrorCode {
public static final int INSUFFICIENT_MEMORY = 123;
public static final int PARSING_ERROR = 124;
public static final int INVALID_INVERTED_LIST_TYPE_TRAITS = 125;
+ public static final int ILLEGAL_STATE = 126;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
@@ -170,7 +171,6 @@ public class ErrorCode {
public static final int INAPPLICABLE_HINT = 10006;
public static final int CROSS_PRODUCT_JOIN = 10007;
public static final int GROUP_ALL_DECOR = 10008;
- public static final int COMPILATION_ILLEGAL_STATE = 10009;
private static class Holder {
private static final Map<Integer, String> errorMessageMap;
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index 01895a6..c01f39a 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -143,6 +143,7 @@
123 = Insufficient memory is provided for the join operators, please increase
the join memory budget.
124 = Parsing error at %1$s line %2$s field %3$s: %4$s
125 = Invalid inverted list type traits: %1$s
+126 = Illegal state. %1$s
10000 = The given rule collection %1$s is not an instance of the List class.
10001 = Cannot compose partition constraint %1$s with %2$s
@@ -152,5 +153,4 @@
10005 = Operator is not implemented: %1$s
10006 = Could not apply %1$s hint: %2$s
10007 = Encountered a cross product join
-10008 = Inappropriate use of group by all with decor variables
-10009 = Illegal state. %1$s
\ No newline at end of file
+10008 = Inappropriate use of group by all with decor variables
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
index 3232527..4f0c304 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/HashSpillableTableFactory.java
@@ -66,7 +66,7 @@ public class HashSpillableTableFactory implements
ISpillableTableFactory {
@Override
public ISpillableTable buildSpillableTable(final IHyracksTaskContext ctx,
int suggestTableSize,
- long inputDataBytesSize, final int[] keyFields, final
IBinaryComparator[] comparators,
+ long inputDataBytesSize, final int[] gbyFields, final int[]
fdFields, final IBinaryComparator[] comparators,
final INormalizedKeyComputer firstKeyNormalizerFactory,
IAggregatorDescriptorFactory aggregateFactory,
RecordDescriptor inRecordDescriptor, RecordDescriptor
outRecordDescriptor, final int framesLimit,
final int seed) throws HyracksDataException {
@@ -79,26 +79,47 @@ public class HashSpillableTableFactory implements
ISpillableTableFactory {
throw new HyracksDataException("The given frame limit is too small
to partition the data.");
}
- final int[] intermediateResultKeys = new int[keyFields.length];
- for (int i = 0; i < keyFields.length; i++) {
- intermediateResultKeys[i] = i;
+ final int[] intermediateResultGbyFields = new int[gbyFields.length];
+ for (int i = 0; i < gbyFields.length; i++) {
+ intermediateResultGbyFields[i] = i;
+ }
+
+ final int[] allFields;
+ final int[] intermediateResultAllFields;
+ if (fdFields == null) {
+ // no need to combine gby and fd
+ allFields = gbyFields;
+ intermediateResultAllFields = intermediateResultGbyFields;
+ } else {
+ allFields = new int[gbyFields.length + fdFields.length];
+ intermediateResultAllFields = new int[gbyFields.length +
fdFields.length];
+ int k = 0;
+ int position = 0;
+ for (int i = 0; i < gbyFields.length; i++, position++, k++) {
+ allFields[k] = gbyFields[i];
+ intermediateResultAllFields[k] = position;
+ }
+ for (int i = 0; i < fdFields.length; i++, position++, k++) {
+ allFields[k] = fdFields[i];
+ intermediateResultAllFields[k] = position;
+ }
}
final FrameTuplePairComparator ftpcInputCompareToAggregate =
- new FrameTuplePairComparator(keyFields,
intermediateResultKeys, comparators);
+ new FrameTuplePairComparator(gbyFields,
intermediateResultGbyFields, comparators);
final ITuplePartitionComputer tpc =
- new FieldHashPartitionComputerFamily(keyFields,
hashFunctionFamilies).createPartitioner(seed);
+ new FieldHashPartitionComputerFamily(gbyFields,
hashFunctionFamilies).createPartitioner(seed);
// For calculating hash value for the already aggregated tuples (not
incoming tuples)
// This computer is required to calculate the hash value of a
aggregated tuple
// while doing the garbage collection work on Hash Table.
final ITuplePartitionComputer tpcIntermediate =
- new FieldHashPartitionComputerFamily(intermediateResultKeys,
hashFunctionFamilies)
+ new
FieldHashPartitionComputerFamily(intermediateResultGbyFields,
hashFunctionFamilies)
.createPartitioner(seed);
final IAggregatorDescriptor aggregator =
aggregateFactory.createAggregator(ctx, inRecordDescriptor,
- outRecordDescriptor, keyFields, intermediateResultKeys, null,
-1);
+ outRecordDescriptor, allFields, intermediateResultAllFields,
null, -1);
final AggregateState aggregateState =
aggregator.createAggregateStates();
@@ -225,8 +246,8 @@ public class HashSpillableTableFactory implements
ISpillableTableFactory {
private void initStateTupleBuilder(IFrameTupleAccessor accessor,
int tIndex) throws HyracksDataException {
stateTupleBuilder.reset();
- for (int k = 0; k < keyFields.length; k++) {
- stateTupleBuilder.addField(accessor, tIndex, keyFields[k]);
+ for (int k = 0; k < allFields.length; k++) {
+ stateTupleBuilder.addField(accessor, tIndex, allFields[k]);
}
aggregator.init(stateTupleBuilder, accessor, tIndex,
aggregateState);
}
@@ -246,10 +267,10 @@ public class HashSpillableTableFactory implements
ISpillableTableFactory {
hashTableForTuplePointer.getTuplePointer(hashEntryPid,
tid, pointer);
bufferAccessor.reset(pointer);
outputTupleBuilder.reset();
- for (int k = 0; k < intermediateResultKeys.length;
k++) {
+ for (int k = 0; k <
intermediateResultAllFields.length; k++) {
outputTupleBuilder.addField(bufferAccessor.getBuffer().array(),
-
bufferAccessor.getAbsFieldStartOffset(intermediateResultKeys[k]),
-
bufferAccessor.getFieldLength(intermediateResultKeys[k]));
+
bufferAccessor.getAbsFieldStartOffset(intermediateResultAllFields[k]),
+
bufferAccessor.getFieldLength(intermediateResultAllFields[k]));
}
boolean hasOutput = false;
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
index 629f211..c60b29a 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/ISpillableTableFactory.java
@@ -27,9 +27,11 @@ import
org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
public interface ISpillableTableFactory extends Serializable {
+
ISpillableTable buildSpillableTable(IHyracksTaskContext ctx, int
inputSizeInTuple, long dataBytesSize,
- int[] keyFields, IBinaryComparator[] comparatorFactories,
INormalizedKeyComputer firstKeyNormalizerFactory,
- IAggregatorDescriptorFactory aggregateFactory, RecordDescriptor
inRecordDescriptor,
- RecordDescriptor outRecordDescriptor, int framesLimit, int seed)
throws HyracksDataException;
+ int[] gbyFields, int[] fdFields, IBinaryComparator[]
comparatorFactories,
+ INormalizedKeyComputer firstKeyNormalizerFactory,
IAggregatorDescriptorFactory aggregateFactory,
+ RecordDescriptor inRecordDescriptor, RecordDescriptor
outRecordDescriptor, int framesLimit, int seed)
+ throws HyracksDataException;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
index 20d223e..02cee04 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupBuildOperatorNodePushable.java
@@ -26,6 +26,7 @@ import
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.io.RunFileWriter;
@@ -47,7 +48,8 @@ public class ExternalGroupBuildOperatorNodePushable extends
AbstractUnaryInputSi
private static final Logger LOGGER = LogManager.getLogger();
private final IHyracksTaskContext ctx;
private final Object stateId;
- private final int[] keyFields;
+ private final int[] gbyFields;
+ private final int[] fdFields; // nullable
private final IBinaryComparator[] comparators;
private final INormalizedKeyComputer firstNormalizerComputer;
private final IAggregatorDescriptorFactory aggregatorFactory;
@@ -63,15 +65,19 @@ public class ExternalGroupBuildOperatorNodePushable extends
AbstractUnaryInputSi
private boolean isFailed = false;
public ExternalGroupBuildOperatorNodePushable(IHyracksTaskContext ctx,
Object stateId, int tableSize, long fileSize,
- int[] keyFields, int framesLimit, IBinaryComparatorFactory[]
comparatorFactories,
+ int[] gbyFields, int[] fdFields, int framesLimit,
IBinaryComparatorFactory[] comparatorFactories,
INormalizedKeyComputerFactory firstNormalizerFactory,
IAggregatorDescriptorFactory aggregatorFactory,
RecordDescriptor inRecordDescriptor, RecordDescriptor
outRecordDescriptor,
- ISpillableTableFactory spillableTableFactory) {
+ ISpillableTableFactory spillableTableFactory) throws
HyracksDataException {
+ if (comparatorFactories.length != gbyFields.length) {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
"mismatch in group by fields and comparators");
+ }
this.ctx = ctx;
this.stateId = stateId;
this.framesLimit = framesLimit;
this.aggregatorFactory = aggregatorFactory;
- this.keyFields = keyFields;
+ this.gbyFields = gbyFields;
+ this.fdFields = fdFields;
this.comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
@@ -88,7 +94,7 @@ public class ExternalGroupBuildOperatorNodePushable extends
AbstractUnaryInputSi
@Override
public void open() throws HyracksDataException {
state = new ExternalGroupState(ctx.getJobletContext().getJobId(),
stateId);
- ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx,
tableSize, fileSize, keyFields,
+ ISpillableTable table = spillableTableFactory.buildSpillableTable(ctx,
tableSize, fileSize, gbyFields, fdFields,
comparators, firstNormalizerComputer, aggregatorFactory,
inRecordDescriptor, outRecordDescriptor,
framesLimit, INIT_SEED);
RunFileWriter[] runFileWriters = new
RunFileWriter[table.getNumPartitions()];
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
index 6dea186..7e23ac3 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptor.java
@@ -44,11 +44,11 @@ public class ExternalGroupOperatorDescriptor extends
AbstractOperatorDescriptor
private static final int MERGE_ACTIVITY_ID = 1;
- private static final long serialVersionUID = 1L;
- private final int[] keyFields;
+ private static final long serialVersionUID = 2L;
+ private final int[] gbyFields;
+ private final int[] fdFields; // nullable
private final IBinaryComparatorFactory[] comparatorFactories;
private final INormalizedKeyComputerFactory firstNormalizerFactory;
-
private final IAggregatorDescriptorFactory partialAggregatorFactory;
private final IAggregatorDescriptorFactory intermediateAggregateFactory;
@@ -60,7 +60,7 @@ public class ExternalGroupOperatorDescriptor extends
AbstractOperatorDescriptor
private final long fileSize;
public ExternalGroupOperatorDescriptor(IOperatorDescriptorRegistry spec,
int inputSizeInTuple, long inputFileSize,
- int[] keyFields, int framesLimit, IBinaryComparatorFactory[]
comparatorFactories,
+ int[] gbyFields, int[] fdFields, int framesLimit,
IBinaryComparatorFactory[] comparatorFactories,
INormalizedKeyComputerFactory firstNormalizerFactory,
IAggregatorDescriptorFactory partialAggregatorFactory,
IAggregatorDescriptorFactory intermediateAggregateFactory,
RecordDescriptor partialAggRecordDesc,
RecordDescriptor outRecordDesc, ISpillableTableFactory
spillableTableFactory) {
@@ -76,7 +76,8 @@ public class ExternalGroupOperatorDescriptor extends
AbstractOperatorDescriptor
}
this.partialAggregatorFactory = partialAggregatorFactory;
this.intermediateAggregateFactory = intermediateAggregateFactory;
- this.keyFields = keyFields;
+ this.gbyFields = gbyFields;
+ this.fdFields = fdFields;
this.comparatorFactories = comparatorFactories;
this.firstNormalizerFactory = firstNormalizerFactory;
this.spillableTableFactory = spillableTableFactory;
@@ -93,13 +94,6 @@ public class ExternalGroupOperatorDescriptor extends
AbstractOperatorDescriptor
this.fileSize = inputFileSize;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * org.apache.hyracks.api.dataflow.IOperatorDescriptor#contributeActivities
- * (org.apache.hyracks.api.dataflow.IActivityGraphBuilder)
- */
@Override
public void contributeActivities(IActivityGraphBuilder builder) {
AggregateActivity aggregateAct = new AggregateActivity(new
ActivityId(getOperatorId(), AGGREGATE_ACTIVITY_ID));
@@ -126,7 +120,7 @@ public class ExternalGroupOperatorDescriptor extends
AbstractOperatorDescriptor
final IRecordDescriptorProvider recordDescProvider, final int
partition, int nPartitions)
throws HyracksDataException {
return new ExternalGroupBuildOperatorNodePushable(ctx, new
TaskId(getActivityId(), partition), tableSize,
- fileSize, keyFields, framesLimit, comparatorFactories,
firstNormalizerFactory,
+ fileSize, gbyFields, fdFields, framesLimit,
comparatorFactories, firstNormalizerFactory,
partialAggregatorFactory,
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0),
outRecDescs[0], spillableTableFactory);
}
@@ -145,8 +139,8 @@ public class ExternalGroupOperatorDescriptor extends
AbstractOperatorDescriptor
throws HyracksDataException {
return new ExternalGroupWriteOperatorNodePushable(ctx,
new TaskId(new ActivityId(getOperatorId(),
AGGREGATE_ACTIVITY_ID), partition),
- spillableTableFactory, partialRecDesc, outRecDesc,
framesLimit, keyFields, firstNormalizerFactory,
- comparatorFactories, intermediateAggregateFactory);
+ spillableTableFactory, partialRecDesc, outRecDesc,
framesLimit, gbyFields, fdFields,
+ firstNormalizerFactory, comparatorFactories,
intermediateAggregateFactory);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
index e8a1b76..1a6f4ef 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupWriteOperatorNodePushable.java
@@ -28,6 +28,7 @@ import
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputer;
import org.apache.hyracks.api.dataflow.value.INormalizedKeyComputerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
+import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.dataflow.common.io.RunFileReader;
@@ -42,6 +43,7 @@ import org.apache.logging.log4j.Logger;
public class ExternalGroupWriteOperatorNodePushable extends
AbstractUnaryOutputSourceOperatorNodePushable
implements IRunFileWriterGenerator {
+
private static final Logger LOGGER = LogManager.getLogger();
private final IHyracksTaskContext ctx;
private final Object stateId;
@@ -49,7 +51,8 @@ public class ExternalGroupWriteOperatorNodePushable extends
AbstractUnaryOutputS
private final RecordDescriptor partialAggRecordDesc;
private final RecordDescriptor outRecordDesc;
private final IAggregatorDescriptorFactory mergeAggregatorFactory;
- private final int[] mergeGroupFields;
+ private final int[] gbyFields;
+ private final int[] fdFields; // nullable
private final IBinaryComparator[] groupByComparators;
private final int frameLimit;
private final INormalizedKeyComputer nmkComputer;
@@ -57,29 +60,38 @@ public class ExternalGroupWriteOperatorNodePushable extends
AbstractUnaryOutputS
public ExternalGroupWriteOperatorNodePushable(IHyracksTaskContext ctx,
Object stateId,
ISpillableTableFactory spillableTableFactory, RecordDescriptor
partialAggRecordDesc,
- RecordDescriptor outRecordDesc, int framesLimit, int[] groupFields,
+ RecordDescriptor outRecordDesc, int framesLimit, int[] gbyFields,
int[] fdFields,
INormalizedKeyComputerFactory nmkFactory,
IBinaryComparatorFactory[] comparatorFactories,
- IAggregatorDescriptorFactory aggregatorFactory) {
+ IAggregatorDescriptorFactory aggregatorFactory) throws
HyracksDataException {
+ if (comparatorFactories.length != gbyFields.length) {
+ throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
"mismatch in group by fields and comparators");
+ }
this.ctx = ctx;
this.stateId = stateId;
this.spillableTableFactory = spillableTableFactory;
this.frameLimit = framesLimit;
this.nmkComputer = nmkFactory == null ? null :
nmkFactory.createNormalizedKeyComputer();
-
this.partialAggRecordDesc = partialAggRecordDesc;
this.outRecordDesc = outRecordDesc;
-
this.mergeAggregatorFactory = aggregatorFactory;
//create merge group fields
- int numGroupFields = groupFields.length;
- mergeGroupFields = new int[numGroupFields];
- for (int i = 0; i < numGroupFields; i++) {
- mergeGroupFields[i] = i;
+ this.gbyFields = new int[gbyFields.length];
+ int position = 0;
+ for (int i = 0; i < this.gbyFields.length; i++, position++) {
+ this.gbyFields[i] = position;
+ }
+ if (fdFields != null) {
+ this.fdFields = new int[fdFields.length];
+ for (int i = 0; i < this.fdFields.length; i++, position++) {
+ this.fdFields[i] = position;
+ }
+ } else {
+ this.fdFields = null;
}
//setup comparators for grouping
- groupByComparators = new
IBinaryComparator[Math.min(mergeGroupFields.length,
comparatorFactories.length)];
+ groupByComparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < groupByComparators.length; i++) {
groupByComparators[i] =
comparatorFactories[i].createBinaryComparator();
}
@@ -122,12 +134,12 @@ public class ExternalGroupWriteOperatorNodePushable
extends AbstractUnaryOutputS
if (runs[i] != null) {
// Calculates the hash table size (# of unique hash values)
based on the budget and a tuple size.
int memoryBudgetInBytes = ctx.getInitialFrameSize() *
frameLimit;
- int groupByColumnsCount = mergeGroupFields.length;
- int hashTableCardinality =
ExternalGroupOperatorDescriptor.calculateGroupByTableCardinality(
- memoryBudgetInBytes, groupByColumnsCount,
ctx.getInitialFrameSize());
+ int allFields = gbyFields.length + (fdFields == null ? 0 :
fdFields.length);
+ int hashTableCardinality = ExternalGroupOperatorDescriptor
+ .calculateGroupByTableCardinality(memoryBudgetInBytes,
allFields, ctx.getInitialFrameSize());
hashTableCardinality = Math.min(hashTableCardinality,
numOfTuples[i]);
ISpillableTable partitionTable =
spillableTableFactory.buildSpillableTable(ctx, hashTableCardinality,
- runs[i].getFileSize(), mergeGroupFields,
groupByComparators, nmkComputer,
+ runs[i].getFileSize(), gbyFields, fdFields,
groupByComparators, nmkComputer,
mergeAggregatorFactory, partialAggRecordDesc,
outRecordDesc, frameLimit, level);
RunFileWriter[] runFileWriters = new
RunFileWriter[partitionTable.getNumPartitions()];
int[] sizeInTuplesNextLevel =
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
index 794ff98..067c9a7 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/test/java/org/apache/hyracks/dataflow/std/group/external/ExternalGroupOperatorDescriptorTest.java
@@ -33,8 +33,8 @@ public class ExternalGroupOperatorDescriptorTest {
// Sets a dummy variable.
IOperatorDescriptorRegistry spec = new JobSpecification(32768);
- ExternalGroupOperatorDescriptor eGByOp =
- new ExternalGroupOperatorDescriptor(spec, 0, 0, null, 4, null,
null, null, null, null, null, null);
+ ExternalGroupOperatorDescriptor eGByOp = new
ExternalGroupOperatorDescriptor(spec, 0, 0, null, null, 4, null,
+ null, null, null, null, null, null);
// Test 1: compiler.groupmemory: 512 bytes, frame size: 256 bytes,
with 1 column group-by
long memoryBudgetInBytes = 512;
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
index 0a57232..ae718bb 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/AggregationTest.java
@@ -163,17 +163,18 @@ public class AggregationTest extends
AbstractIntegrationTest {
int tableSize = 8;
long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new
ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, frameLimits, new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(new
IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false), new
IntSumFieldAggregatorFactory(3, false),
- new FloatSumFieldAggregatorFactory(5, false) }),
- new MultiFieldsAggregatorFactory(new
IFieldAggregateDescriptorFactory[] {
- new IntSumFieldAggregatorFactory(1, false), new
IntSumFieldAggregatorFactory(2, false),
- new FloatSumFieldAggregatorFactory(3, false) }),
- outputRec, outputRec, new HashSpillableTableFactory(
- new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+ ExternalGroupOperatorDescriptor grouper =
+ new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
keyFields, null, frameLimits,
+ new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(new
IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
new IntSumFieldAggregatorFactory(3, false),
+ new FloatSumFieldAggregatorFactory(5, false)
}),
+ new MultiFieldsAggregatorFactory(new
IFieldAggregateDescriptorFactory[] {
+ new IntSumFieldAggregatorFactory(1, false),
new IntSumFieldAggregatorFactory(2, false),
+ new FloatSumFieldAggregatorFactory(3, false)
}),
+ outputRec, outputRec, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
NC2_ID, NC1_ID);
@@ -253,7 +254,7 @@ public class AggregationTest extends
AbstractIntegrationTest {
long fileSize = frameLimits * spec.getFrameSize();
ExternalGroupOperatorDescriptor grouper =
- new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
keyFields, frameLimits,
+ new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
keyFields, null, frameLimits,
new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new
IFieldAggregateDescriptorFactory[] {
@@ -341,17 +342,18 @@ public class AggregationTest extends
AbstractIntegrationTest {
int tableSize = 8;
long fileSize = frameLimits * spec.getFrameSize();
- ExternalGroupOperatorDescriptor grouper = new
ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, frameLimits, new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE },
- new UTF8StringNormalizedKeyComputerFactory(),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new
IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(15,
true, true) }),
- new MultiFieldsAggregatorFactory(
- new IFieldAggregateDescriptorFactory[] { new
IntSumFieldAggregatorFactory(1, false),
- new MinMaxStringFieldAggregatorFactory(2,
true, true) }),
- outputRec, outputRec, new HashSpillableTableFactory(
- new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE }));
+ ExternalGroupOperatorDescriptor grouper =
+ new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
keyFields, null, frameLimits,
+ new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE },
+ new UTF8StringNormalizedKeyComputerFactory(),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new
IntSumFieldAggregatorFactory(1, false),
+ new
MinMaxStringFieldAggregatorFactory(15, true, true) }),
+ new MultiFieldsAggregatorFactory(
+ new IFieldAggregateDescriptorFactory[] { new
IntSumFieldAggregatorFactory(1, false),
+ new
MinMaxStringFieldAggregatorFactory(2, true, true) }),
+ outputRec, outputRec, new HashSpillableTableFactory(
+ new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE }));
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, grouper,
NC2_ID, NC1_ID);
@@ -432,7 +434,7 @@ public class AggregationTest extends
AbstractIntegrationTest {
long fileSize = frameLimits * spec.getFrameSize();
ExternalGroupOperatorDescriptor grouper = new
ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, frameLimits,
+ keyFields, null, frameLimits,
new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE,
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
@@ -527,7 +529,7 @@ public class AggregationTest extends
AbstractIntegrationTest {
long fileSize = frameLimits * spec.getFrameSize();
ExternalGroupOperatorDescriptor grouper =
- new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
keyFields, frameLimits,
+ new ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
keyFields, null, frameLimits,
new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE,
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
@@ -624,7 +626,7 @@ public class AggregationTest extends
AbstractIntegrationTest {
long fileSize = frameLimits * spec.getFrameSize();
ExternalGroupOperatorDescriptor grouper = new
ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, frameLimits,
+ keyFields, null, frameLimits,
new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE,
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
index c286e52..5a8ec34 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/LocalityAwareConnectorTest.java
@@ -126,7 +126,7 @@ public class LocalityAwareConnectorTest extends
AbstractMultiNCIntegrationTest {
int tableSize = 8;
ExternalGroupOperatorDescriptor grouper = new
ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, fileSize / spec.getFrameSize() + 1,
+ keyFields, null, fileSize / spec.getFrameSize() + 1,
new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new
IFieldAggregateDescriptorFactory[] {
@@ -191,7 +191,7 @@ public class LocalityAwareConnectorTest extends
AbstractMultiNCIntegrationTest {
int tableSize = 8;
ExternalGroupOperatorDescriptor grouper = new
ExternalGroupOperatorDescriptor(spec, tableSize, fileSize,
- keyFields, fileSize / spec.getFrameSize() + 1,
+ keyFields, null, fileSize / spec.getFrameSize() + 1,
new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new
IFieldAggregateDescriptorFactory[] {
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
index f1a4231..75ecc34 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/unit/ExternalHashGroupbyTest.java
@@ -22,6 +22,7 @@ package org.apache.hyracks.tests.unit;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
import
org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
import
org.apache.hyracks.dataflow.std.base.AbstractUnaryOutputSourceOperatorNodePushable;
import org.apache.hyracks.dataflow.std.group.HashSpillableTableFactory;
@@ -34,14 +35,14 @@ public class ExternalHashGroupbyTest extends
AbstractExternalGroupbyTest {
ExternalGroupWriteOperatorNodePushable mergeOperator;
@Override
- protected void initial(IHyracksTaskContext ctx, int tableSize, int
numFrames) {
+ protected void initial(IHyracksTaskContext ctx, int tableSize, int
numFrames) throws HyracksDataException {
ISpillableTableFactory tableFactory = new HashSpillableTableFactory(
new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE });
buildOperator = new ExternalGroupBuildOperatorNodePushable(ctx,
this.hashCode(), tableSize,
- numFrames * ctx.getInitialFrameSize(), keyFields, numFrames,
comparatorFactories,
+ numFrames * ctx.getInitialFrameSize(), keyFields, null,
numFrames, comparatorFactories,
normalizedKeyComputerFactory, partialAggrInPlace,
inRecordDesc, outputRec, tableFactory);
mergeOperator = new ExternalGroupWriteOperatorNodePushable(ctx,
this.hashCode(), tableFactory, outputRec,
- outputRec, numFrames, keyFieldsAfterPartial,
normalizedKeyComputerFactory, comparatorFactories,
+ outputRec, numFrames, keyFieldsAfterPartial, null,
normalizedKeyComputerFactory, comparatorFactories,
finalAggrInPlace);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
index cf04bee..2bc742a 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/text-example/textclient/src/main/java/org/apache/hyracks/examples/text/client/WordCountMain.java
@@ -153,7 +153,7 @@ public class WordCountMain {
IOperatorDescriptor gBy;
int[] keys = new int[] { 0 };
if ("hash".equalsIgnoreCase(algo)) {
- gBy = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize,
keys, frameLimit,
+ gBy = new ExternalGroupOperatorDescriptor(spec, htSize, fileSize,
keys, null, frameLimit,
new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new
IFieldAggregateDescriptorFactory[] {
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
index 72660c0..a14b44d 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Groupby.java
@@ -148,7 +148,7 @@ public class Groupby {
AbstractOperatorDescriptor grouper;
if (alg.equalsIgnoreCase("hash")) {// external hash graph
- grouper = new ExternalGroupOperatorDescriptor(spec, htSize,
fileSize, keys, frameLimit,
+ grouper = new ExternalGroupOperatorDescriptor(spec, htSize,
fileSize, keys, null, frameLimit,
new IBinaryComparatorFactory[] {
IntegerBinaryComparatorFactory.INSTANCE },
new IntegerNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 209bf34..3bbeca8 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -221,7 +221,7 @@ public class Join {
new UTF8StringSerializerDeserializer(),
IntegerSerializerDeserializer.INSTANCE });
ExternalGroupOperatorDescriptor gby = new
ExternalGroupOperatorDescriptor(spec, tableSize,
- custFileSize + orderFileSize, new int[] { 6 }, memSize,
+ custFileSize + orderFileSize, new int[] { 6 }, null,
memSize,
new IBinaryComparatorFactory[] {
UTF8StringBinaryComparatorFactory.INSTANCE },
new UTF8StringNormalizedKeyComputerFactory(),
new MultiFieldsAggregatorFactory(new
IFieldAggregateDescriptorFactory[] {