Repository: tajo Updated Branches: refs/heads/branch-0.11.2 a8f62b918 -> cd1b133a5
TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an union block. Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/cd1b133a Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/cd1b133a Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/cd1b133a Branch: refs/heads/branch-0.11.2 Commit: cd1b133a5dfb76090fd8f3a957492428044b3054 Parents: a8f62b9 Author: Jihoon Son <[email protected]> Authored: Mon Feb 22 11:04:57 2016 +0900 Committer: Jihoon Son <[email protected]> Committed: Mon Feb 22 11:04:57 2016 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../tajo/catalog/statistics/StatisticsUtil.java | 31 +++++++++++++++++++- .../tajo/engine/query/TestTableSubQuery.java | 16 +++++++++- .../TestTableSubQuery/testGroupbyOnUnion.result | 6 ++++ .../physical/HashShuffleFileWriteExec.java | 4 +-- .../java/org/apache/tajo/querymaster/Query.java | 6 ++-- .../java/org/apache/tajo/querymaster/Stage.java | 16 +++++----- 7 files changed, 67 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/cd1b133a/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 55beeaa..90db2c8 100644 --- a/CHANGES +++ b/CHANGES @@ -17,6 +17,9 @@ Release 0.11.2 - unreleased BUG FIXES + TAJO-2080: ArrayIndexOutOfBoundsException when performing aggregation on an + union block. (jihoon) + TAJO-2077: Join condition causes incorrect result, when a table has an empty row file. (jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/cd1b133a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java index c481276..b3c18c5 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java @@ -18,10 +18,14 @@ package org.apache.tajo.catalog.statistics; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import java.util.ArrayList; import java.util.List; public class StatisticsUtil { @@ -153,4 +157,29 @@ public class StatisticsUtil { return aggregated; } -} \ No newline at end of file + + public static List<ColumnStats> aggregateColumnStats(List<ColumnStats> stats1, List<ColumnStats> stats2) { + Preconditions.checkState(stats1.size() == stats2.size()); + List<ColumnStats> result = new ArrayList<>(stats1.size()); + for (int i = 0; i < result.size(); i++) { + Preconditions.checkState(stats1.get(i).getColumn().getTypeDesc().equals(stats2.get(i).getColumn().getTypeDesc())); + ColumnStats resultStats = new ColumnStats(stats1.get(i).getColumn()); + // TODO: resultStats.setNumDistValues(); + resultStats.setNumNulls(stats1.get(i).getNumNulls() + stats2.get(i).getNumNulls()); + resultStats.setMaxValue(stats1.get(i).getMaxValue().compareTo(stats2.get(i).getMaxValue()) > 0 ? + stats1.get(i).getMaxValue() : stats2.get(i).getMaxValue()); + resultStats.setMinValue(stats1.get(i).getMinValue().compareTo(stats2.get(i).getMinValue()) < 0 ? + stats1.get(i).getMinValue() : stats2.get(i).getMinValue()); + result.add(resultStats); + } + return result; + } + + public static List<ColumnStats> emptyColumnStats(Schema schema) { + List<ColumnStats> stats = new ArrayList<>(schema.size()); + for (Column column : schema.getRootColumns()) { + stats.add(new ColumnStats(column)); + } + return stats; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/cd1b133a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java index f59bac7..8703834 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTableSubQuery.java @@ -109,4 +109,18 @@ public class TestTableSubQuery extends QueryTestCaseBase { public void testMultipleSubqueriesWithAggregation() throws Exception { runSimpleTests(); } -} \ No newline at end of file + + @Test + @Option(sort = true) + @SimpleTest( + queries = @QuerySpec("" + + "select sum(t.cnt) as cnt, l_orderkey, l_partkey, 'my view' from (" + + "select l_orderkey, l_partkey, CAST(COUNT(1) AS INT4) as cnt from lineitem group by l_orderkey, l_partkey " + + "union all " + + "select l_orderkey, l_partkey, l_linenumber as cnt from lineitem) as t " + + "group by l_orderkey, l_partkey") + ) + public void testGroupbyOnUnion() throws Exception { + runSimpleTests(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/cd1b133a/tajo-core-tests/src/test/resources/results/TestTableSubQuery/testGroupbyOnUnion.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestTableSubQuery/testGroupbyOnUnion.result b/tajo-core-tests/src/test/resources/results/TestTableSubQuery/testGroupbyOnUnion.result new file mode 100644 index 0000000..a462d7e --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestTableSubQuery/testGroupbyOnUnion.result @@ -0,0 +1,6 @@ +cnt,l_orderkey,l_partkey,?text +------------------------------- +2,3,2,my view +2,2,2,my view +5,1,1,my view +3,3,3,my view \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/cd1b133a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 49b0e11..5563ab9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -170,7 +170,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { writtenBytes += usedBufferSize; usedBufferSize = totalBufferCapacity = 0; - TableStats aggregated = (TableStats) child.getInputStats().clone(); + TableStats aggregated = new TableStats(); aggregated.setNumBytes(writtenBytes); aggregated.setNumRows(numRows); context.setResultStats(aggregated); @@ -243,4 +243,4 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { progress = 1.0f; super.close(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/tajo/blob/cd1b133a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index d447476..33320cb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -505,7 +505,7 @@ public class Query implements EventHandler<QueryEvent> { query.context.getQueryContext(), lastStage.getId(), lastStage.getMasterPlan().getLogicalPlan(), - lastStage.getSchema(), + lastStage.getOutSchema(), tableDesc); QueryHookExecutor hookExecutor = new QueryHookExecutor(query.context.getQueryMasterContext()); @@ -636,7 +636,7 @@ public class Query implements EventHandler<QueryEvent> { TableDesc resultTableDesc = new TableDesc( query.getId().toString(), - lastStage.getSchema(), + lastStage.getOutSchema(), meta, finalOutputDir.toUri()); resultTableDesc.setExternal(true); @@ -713,7 +713,7 @@ public class Query implements EventHandler<QueryEvent> { finalTable = catalog.getTableDesc(tableName); } else { String tableName = query.getId().toString(); - finalTable = new TableDesc(tableName, lastStage.getSchema(), meta, finalOutputDir.toUri()); + finalTable = new TableDesc(tableName, lastStage.getOutSchema(), meta, finalOutputDir.toUri()); } long volume = getTableVolume(query.systemConf, finalOutputDir); http://git-wip-us.apache.org/repos/asf/tajo/blob/cd1b133a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 51c5431..04ff115 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -45,7 +45,6 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.error.Errors.SerializedException; import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.exception.TajoException; -import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TaskState; import org.apache.tajo.master.event.*; @@ -96,7 +95,7 @@ public class Stage implements EventHandler<StageEvent> { private MasterPlan masterPlan; private ExecutionBlock block; private int priority; - private Schema schema; + private Schema outSchema; private TableMeta meta; private TableStats resultStatistics; private TableStats inputStatistics; @@ -592,8 +591,8 @@ public class Stage implements EventHandler<StageEvent> { return tasks.get(qid); } - public Schema getSchema() { - return schema; + public Schema getOutSchema() { + return outSchema; } public TableMeta getTableMeta() { @@ -667,8 +666,7 @@ public class Stage implements EventHandler<StageEvent> { long[] numRows = new long[]{0, 0}; int[] numBlocks = new int[]{0, 0}; int[] numOutputs = new int[]{0, 0}; - - List<ColumnStats> columnStatses = Lists.newArrayList(); + List<ColumnStats> columnStatses = StatisticsUtil.emptyColumnStats(stage.getDataChannel().getSchema()); MasterPlan masterPlan = stage.getMasterPlan(); for (ExecutionBlock block : masterPlan.getChilds(stage.getBlock())) { @@ -687,7 +685,9 @@ public class Stage implements EventHandler<StageEvent> { numOutputs[i] += childStatArray[i].getNumShuffleOutputs(); numRows[i] += childStatArray[i].getNumRows(); } - columnStatses.addAll(childStatArray[1].getColumnStats()); + if (childStatArray[1].getColumnStats() != null && childStatArray[1].getColumnStats().size() > 0) { + columnStatses = StatisticsUtil.aggregateColumnStats(columnStatses, childStatArray[1].getColumnStats()); + } } for (int i = 0; i < 2; i++) { @@ -794,7 +794,7 @@ public class Stage implements EventHandler<StageEvent> { dataFormat = channel.getDataFormat(); } - schema = channel.getSchema(); + outSchema = channel.getSchema(); meta = CatalogUtil.newTableMeta(dataFormat, new KeyValueSet()); inputStatistics = statsArray[0]; resultStatistics = statsArray[1];
