[
https://issues.apache.org/jira/browse/TAJO-1010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14131131#comment-14131131
]
ASF GitHub Bot commented on TAJO-1010:
--------------------------------------
Github user blrunner commented on a diff in the pull request:
https://github.com/apache/tajo/pull/136#discussion_r17463756
--- Diff:
tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/DistinctGroupbyThirdAggregationExec.java
---
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.engine.planner.physical;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.datum.NullDatum;
+import org.apache.tajo.engine.eval.AggregationFunctionCallEval;
+import org.apache.tajo.engine.function.FunctionContext;
+import org.apache.tajo.engine.planner.Target;
+import org.apache.tajo.engine.planner.logical.DistinctGroupbyNode;
+import org.apache.tajo.engine.planner.logical.GroupbyNode;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.worker.TaskAttemptContext;
+
+import java.io.IOException;
+import java.util.*;
+
+public class DistinctGroupbyThirdAggregationExec extends UnaryPhysicalExec
{
+ private static Log LOG =
LogFactory.getLog(DistinctGroupbyThirdAggregationExec.class);
+ private DistinctGroupbyNode plan;
+ private PhysicalExec child;
+
+ private boolean finished = false;
+
+ private DistinctFinalAggregator[] aggregators;
+ private DistinctFinalAggregator nonDistinctAggr;
+
+ private int resultTupleLength;
+ private int numGroupingColumns;
+
+ private int[] resultTupleIndexes;
+
+ public DistinctGroupbyThirdAggregationExec(TaskAttemptContext context,
DistinctGroupbyNode plan, SortExec sortExec)
+ throws IOException {
+ super(context, plan.getInSchema(), plan.getOutSchema(), sortExec);
+ this.plan = plan;
+ this.child = sortExec;
+ }
+
+ @Override
+ public void init() throws IOException {
+ this.child.init();
+
+ numGroupingColumns = plan.getGroupingColumns().length;
+ resultTupleLength = numGroupingColumns;
+
+ List<GroupbyNode> groupbyNodes = plan.getGroupByNodes();
+
+ List<DistinctFinalAggregator> aggregatorList = new
ArrayList<DistinctFinalAggregator>();
+ int inTupleIndex = 1 + numGroupingColumns;
+ int outTupleIndex = numGroupingColumns;
+ int distinctSeq = 0;
+
+ for (GroupbyNode eachGroupby : groupbyNodes) {
+ if (eachGroupby.isDistinct()) {
+ aggregatorList.add(new DistinctFinalAggregator(distinctSeq,
inTupleIndex, outTupleIndex, eachGroupby));
+ distinctSeq++;
+
+ Column[] distinctGroupingColumns =
eachGroupby.getGroupingColumns();
+ inTupleIndex += distinctGroupingColumns.length;
+ outTupleIndex += eachGroupby.getAggFunctions().length;
+ } else {
+ nonDistinctAggr = new DistinctFinalAggregator(-1, inTupleIndex,
outTupleIndex, eachGroupby);
+ outTupleIndex += eachGroupby.getAggFunctions().length;
+ }
+ resultTupleLength += eachGroupby.getAggFunctions().length;
+ }
+ aggregators = aggregatorList.toArray(new DistinctFinalAggregator[]{});
+
+ // make output schema mapping index
+ resultTupleIndexes = new int[outSchema.size()];
+ Map<Column, Integer> groupbyResultTupleIndex = new HashMap<Column,
Integer>();
+ int resultTupleIndex = 0;
+ for (Column eachColumn: plan.getGroupingColumns()) {
+ groupbyResultTupleIndex.put(eachColumn, resultTupleIndex);
+ resultTupleIndex++;
+ }
+ for (GroupbyNode eachGroupby : groupbyNodes) {
+ Set<Column> groupingColumnSet = new HashSet<Column>();
+ for (Column column: eachGroupby.getGroupingColumns()) {
+ groupingColumnSet.add(column);
+ }
+ for (Target eachTarget: eachGroupby.getTargets()) {
+ if (!groupingColumnSet.contains(eachTarget.getNamedColumn())) {
+ //aggr function
+ groupbyResultTupleIndex.put(eachTarget.getNamedColumn(),
resultTupleIndex);
+ resultTupleIndex++;
+ }
+ }
+ }
+
+ int index = 0;
+ for (Column eachOutputColumn: outSchema.getColumns()) {
+ // If column is avg aggregation function, outschema's column type is
float
+ // but groupbyResultTupleIndex's column type is protobuf
+
+ int matchedIndex = -1;
+ for (Column eachIndexColumn: groupbyResultTupleIndex.keySet()) {
+ if
(eachIndexColumn.getQualifiedName().equals(eachOutputColumn.getQualifiedName()))
{
+ matchedIndex = groupbyResultTupleIndex.get(eachIndexColumn);
+ break;
+ }
+ }
+ if (matchedIndex < 0) {
+ throw new IOException("Can't find proper output column mapping: "
+ eachOutputColumn);
+ }
+ resultTupleIndexes[matchedIndex] = index++;
+ }
+ }
+
+ Tuple prevKeyTuple = null;
+ Tuple prevTuple = null;
+
+ @Override
+ public Tuple next() throws IOException {
+ if (finished) {
+ return null;
+ }
+
+ Tuple resultTuple = new VTuple(resultTupleLength);
+
+ while (!context.isStopped()) {
+ Tuple childTuple = child.next();
+ // Last tuple
+ if (childTuple == null) {
+ finished = true;
+
+ if (prevTuple == null) {
+ // Empty case
+ if (numGroupingColumns == 0) {
+ // No grouping column, return null tuple
+ return makeEmptyTuple();
+ } else {
+ return null;
+ }
+ }
+
+ for (int i = 0; i < numGroupingColumns; i++) {
+ resultTuple.put(resultTupleIndexes[i], prevTuple.get(i + 1));
+ }
+ for (DistinctFinalAggregator eachAggr: aggregators) {
+ eachAggr.terminate(resultTuple);
+ }
+ break;
+ }
+
+ Tuple tuple = null;
+ try {
+ tuple = childTuple.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new IOException(e.getMessage(), e);
+ }
+
+ int distinctSeq = tuple.get(0).asInt2();
--- End diff --
I expected that the maximum number of distinct aggregation functions will
not overtake short int. If it is necessary, we should describe it to users
later.
> Improve multiple DISTINCT aggregation.
> --------------------------------------
>
> Key: TAJO-1010
> URL: https://issues.apache.org/jira/browse/TAJO-1010
> Project: Tajo
> Issue Type: Improvement
> Components: planner/optimizer
> Reporter: Jaehwa Jung
> Assignee: Jaehwa Jung
>
> Currently, tajo provides three stage for optimizing distinct query
> aggregation. But it just supports one column for distinct aggregation as
> follows:
> {code:title=Query1|borderStyle=solid}
> select a.flag, count(distinct a.id) as cnt, sum(distinct a.id) as total
> from table1
> group by a.flag
> {code}
> If you write two more columns for distinct aggregation, you can't apply
> optimized distinct aggregation as follows:
> {code:title=Query2|borderStyle=solid}
> select a.flag, count(distinct a.id) as cnt, sum(distinct a.id) as total
> , count(distinct a.name) as cnt2, count(distinct a.code) as cnt3
> from table1
> group by a.flag
> {code}
> In this case, you may see low performance for your query. Thus, we need to
> improve multiple DISTINCT aggregation. Correctly, we should support three
> stage for multiple DISTINCT aggregation.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)