Repository: tajo Updated Branches: refs/heads/index_support 5a8fed408 -> 1bf1536db
TAJO-1484 Apply on ColPartitionStoreExec. (Contributed by Navis, committed by hyunsik) Closes #485 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c2aef7d3 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c2aef7d3 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c2aef7d3 Branch: refs/heads/index_support Commit: c2aef7d367859b3c6a08ef82c5024bd73bb5d62b Parents: fa063f0 Author: Hyunsik Choi <[email protected]> Authored: Fri Jul 24 08:17:40 2015 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Fri Jul 24 08:17:40 2015 +0900 ---------------------------------------------------------------------- CHANGES | 3 + .../planner/physical/ColPartitionStoreExec.java | 17 ++---- .../HashBasedColPartitionStoreExec.java | 59 +++++++++---------- .../SortBasedColPartitionStoreExec.java | 60 ++++++++------------ 4 files changed, 59 insertions(+), 80 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/c2aef7d3/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 9aec974..1c79c0d 100644 --- a/CHANGES +++ b/CHANGES @@ -399,6 +399,9 @@ Release 0.11.0 - unreleased SUB TASKS + TAJO-1484 Apply on ColPartitionStoreExec. (Contributed by Navis, + committed by hyunsik) + TAJO-1464: Add ORCFileScanner to read ORCFile table. (Contributed by Jongyoung Park, Committed by jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/c2aef7d3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 76abc6d..33714db 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -63,17 +63,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { super(context, plan.getInSchema(), plan.getOutSchema(), child); this.plan = plan; - if (plan.getType() == NodeType.CREATE_TABLE) { - if (!(plan instanceof CreateTableNode)) { - throw new IllegalArgumentException("plan should be a CreateTableNode type."); - } - this.outSchema = ((CreateTableNode)plan).getTableSchema(); - } else if (plan.getType() == NodeType.INSERT) { - if (!(plan instanceof InsertNode)) { - throw new IllegalArgumentException("plan should be a InsertNode type."); - } - this.outSchema = ((InsertNode)plan).getTableSchema(); - } + this.outSchema = plan.getTableSchema(); // set table meta if (this.plan.hasOptions()) { @@ -171,4 +161,9 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { appender.enableStats(); appender.init(); } + + @Override + public void rescan() throws IOException { + // nothing to do + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/c2aef7d3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java index 0a812ee..1860ec0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java @@ -20,6 +20,8 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.Appender; import org.apache.tajo.storage.Tuple; @@ -36,26 +38,35 @@ import java.util.Map; * This class is a physical operator to store at column partitioned table. */ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { - private final Map<String, Appender> appenderMap = new HashMap<String, Appender>(); + + private final ComparableTuple partKey; + private final Map<ComparableTuple, Appender> appenderMap = new HashMap<ComparableTuple, Appender>(); public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException { super(context, plan, child); + partKey = new ComparableTuple(inSchema, keyIds); } - public void init() throws IOException { - super.init(); - } - - private Appender getAppender(String partition) throws IOException { - Appender appender = appenderMap.get(partition); + private transient final StringBuilder sb = new StringBuilder(); - if (appender == null) { - appender = getNextPartitionAppender(partition); - appenderMap.put(partition, appender); - } else { - appender = appenderMap.get(partition); + private Appender getAppender(ComparableTuple partitionKey, Tuple tuple) throws IOException { + Appender appender = appenderMap.get(partitionKey); + if (appender != null) { + return appender; } + sb.setLength(0); + for (int i = 0; i < keyNum; i++) { + if (i > 0) { + sb.append('/'); + } + sb.append(keyNames[i]).append('='); + Datum datum = tuple.asDatum(keyIds[i]); + sb.append(StringUtils.escapePathName(datum.asChars())); + } + appender = getNextPartitionAppender(sb.toString()); + + appenderMap.put(partitionKey.copy(), appender); return appender; } @@ -65,27 +76,14 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { @Override public Tuple next() throws IOException { Tuple tuple; - StringBuilder sb = new StringBuilder(); while(!context.isStopped() && (tuple = child.next()) != null) { - // set subpartition directory name - sb.delete(0, sb.length()); - if (keyIds != null) { - for(int i = 0; i < keyIds.length; i++) { - if(i > 0) - sb.append("/"); - sb.append(keyNames[i]).append("="); - sb.append(StringUtils.escapePathName(tuple.getText(keyIds[i]))); - } - } - + partKey.set(tuple); // add tuple - Appender appender = getAppender(sb.toString()); - appender.addTuple(tuple); + getAppender(partKey, tuple).addTuple(tuple); } List<TableStats> statSet = new ArrayList<TableStats>(); - for (Map.Entry<String, Appender> entry : appenderMap.entrySet()) { - Appender app = entry.getValue(); + for (Appender app : appenderMap.values()) { app.flush(); app.close(); statSet.add(app.getStats()); @@ -97,9 +95,4 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec { return null; } - - @Override - public void rescan() throws IOException { - // nothing to do - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/c2aef7d3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java index 3a0dd38..607dff7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java @@ -22,9 +22,10 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.statistics.StatisticsUtil; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; import org.apache.tajo.util.StringUtils; import org.apache.tajo.worker.TaskAttemptContext; @@ -35,35 +36,25 @@ import java.io.IOException; * ascending or descending order of partition columns. */ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { - private Tuple currentKey; - private Tuple prevKey; + + private ComparableTuple prevKey; public SortBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec child) throws IOException { super(context, plan, child); } - public void init() throws IOException { - super.init(); - - currentKey = new VTuple(keyNum); - } - - private void fillKeyTuple(Tuple inTuple, Tuple keyTuple) { - for (int i = 0; i < keyIds.length; i++) { - keyTuple.put(i, inTuple.asDatum(keyIds[i])); - } - } - - private String getSubdirectory(Tuple keyTuple) { - StringBuilder sb = new StringBuilder(); + private transient StringBuilder sb = new StringBuilder(); + private String getSubdirectory(Tuple tuple) { + sb.setLength(0); for(int i = 0; i < keyIds.length; i++) { - if(i > 0) { - sb.append("/"); + Datum datum = tuple.asDatum(keyIds[i]); + if (i > 0) { + sb.append('/'); } - sb.append(keyNames[i]).append("="); - sb.append(StringUtils.escapePathName(keyTuple.getText(i))); + sb.append(keyNames[i]).append('='); + sb.append(StringUtils.escapePathName(datum.asChars())); } return sb.toString(); } @@ -73,22 +64,19 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec { Tuple tuple; while(!context.isStopped() && (tuple = child.next()) != null) { - fillKeyTuple(tuple, currentKey); - if (prevKey == null) { - appender = getNextPartitionAppender(getSubdirectory(currentKey)); - prevKey = new VTuple(currentKey); - } else { - if (!prevKey.equals(currentKey)) { - appender.close(); - StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); - - appender = getNextPartitionAppender(getSubdirectory(currentKey)); - prevKey.put(currentKey.getValues()); - - // reset all states for file rotating - writtenFileNum = 0; - } + appender = getNextPartitionAppender(getSubdirectory(tuple)); + prevKey = new ComparableTuple(inSchema, keyIds); + prevKey.set(tuple); + } else if (!prevKey.equals(tuple)) { + appender.close(); + StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats()); + + appender = getNextPartitionAppender(getSubdirectory(tuple)); + prevKey.set(tuple); + + // reset all states for file rotating + writtenFileNum = 0; } appender.addTuple(tuple);
