Repository: tajo
Updated Branches:
  refs/heads/master fa063f0e8 -> c2aef7d36


TAJO-1484 Apply on ColPartitionStoreExec. (Contributed by Navis, committed by 
hyunsik)

Closes #485


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/c2aef7d3
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/c2aef7d3
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/c2aef7d3

Branch: refs/heads/master
Commit: c2aef7d367859b3c6a08ef82c5024bd73bb5d62b
Parents: fa063f0
Author: Hyunsik Choi <[email protected]>
Authored: Fri Jul 24 08:17:40 2015 +0900
Committer: Hyunsik Choi <[email protected]>
Committed: Fri Jul 24 08:17:40 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 +
 .../planner/physical/ColPartitionStoreExec.java | 17 ++----
 .../HashBasedColPartitionStoreExec.java         | 59 +++++++++----------
 .../SortBasedColPartitionStoreExec.java         | 60 ++++++++------------
 4 files changed, 59 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/c2aef7d3/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 9aec974..1c79c0d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -399,6 +399,9 @@ Release 0.11.0 - unreleased
 
   SUB TASKS
 
+    TAJO-1484 Apply on ColPartitionStoreExec. (Contributed by Navis, 
+    committed by hyunsik)
+
     TAJO-1464: Add ORCFileScanner to read ORCFile table. (Contributed by 
     Jongyoung Park, Committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2aef7d3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 76abc6d..33714db 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -63,17 +63,7 @@ public abstract class ColPartitionStoreExec extends 
UnaryPhysicalExec {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
 
-    if (plan.getType() == NodeType.CREATE_TABLE) {
-      if (!(plan instanceof CreateTableNode)) {
-        throw new IllegalArgumentException("plan should be a CreateTableNode 
type.");
-      }
-      this.outSchema = ((CreateTableNode)plan).getTableSchema();
-    } else if (plan.getType() == NodeType.INSERT) {
-      if (!(plan instanceof InsertNode)) {
-        throw new IllegalArgumentException("plan should be a InsertNode 
type.");
-      }
-      this.outSchema = ((InsertNode)plan).getTableSchema();
-    }
+    this.outSchema = plan.getTableSchema();
 
     // set table meta
     if (this.plan.hasOptions()) {
@@ -171,4 +161,9 @@ public abstract class ColPartitionStoreExec extends 
UnaryPhysicalExec {
     appender.enableStats();
     appender.init();
   }
+
+  @Override
+  public void rescan() throws IOException {
+    // nothing to do
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2aef7d3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
index 0a812ee..1860ec0 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -20,6 +20,8 @@ package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.datum.Datum;
+import 
org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple;
 import org.apache.tajo.plan.logical.StoreTableNode;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.Tuple;
@@ -36,26 +38,35 @@ import java.util.Map;
  * This class is a physical operator to store at column partitioned table.
  */
 public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec {
-  private final Map<String, Appender> appenderMap = new HashMap<String, 
Appender>();
+
+  private final ComparableTuple partKey;
+  private final Map<ComparableTuple, Appender> appenderMap = new 
HashMap<ComparableTuple, Appender>();
 
   public HashBasedColPartitionStoreExec(TaskAttemptContext context, 
StoreTableNode plan, PhysicalExec child)
       throws IOException {
     super(context, plan, child);
+    partKey = new ComparableTuple(inSchema, keyIds);
   }
 
-  public void init() throws IOException {
-    super.init();
-  }
-
-  private Appender getAppender(String partition) throws IOException {
-    Appender appender = appenderMap.get(partition);
+  private transient final StringBuilder sb = new StringBuilder();
 
-    if (appender == null) {
-      appender = getNextPartitionAppender(partition);
-      appenderMap.put(partition, appender);
-    } else {
-      appender = appenderMap.get(partition);
+  private Appender getAppender(ComparableTuple partitionKey, Tuple tuple) 
throws IOException {
+    Appender appender = appenderMap.get(partitionKey);
+    if (appender != null) {
+      return appender;
     }
+    sb.setLength(0);
+    for (int i = 0; i < keyNum; i++) {
+      if (i > 0) {
+        sb.append('/');
+      }
+      sb.append(keyNames[i]).append('=');
+      Datum datum = tuple.asDatum(keyIds[i]);
+      sb.append(StringUtils.escapePathName(datum.asChars()));
+    }
+    appender = getNextPartitionAppender(sb.toString());
+
+    appenderMap.put(partitionKey.copy(), appender);
     return appender;
   }
 
@@ -65,27 +76,14 @@ public class HashBasedColPartitionStoreExec extends 
ColPartitionStoreExec {
   @Override
   public Tuple next() throws IOException {
     Tuple tuple;
-    StringBuilder sb = new StringBuilder();
     while(!context.isStopped() && (tuple = child.next()) != null) {
-      // set subpartition directory name
-      sb.delete(0, sb.length());
-      if (keyIds != null) {
-        for(int i = 0; i < keyIds.length; i++) {
-          if(i > 0)
-            sb.append("/");
-          sb.append(keyNames[i]).append("=");
-          sb.append(StringUtils.escapePathName(tuple.getText(keyIds[i])));
-        }
-      }
-
+      partKey.set(tuple);
       // add tuple
-      Appender appender = getAppender(sb.toString());
-      appender.addTuple(tuple);
+      getAppender(partKey, tuple).addTuple(tuple);
     }
 
     List<TableStats> statSet = new ArrayList<TableStats>();
-    for (Map.Entry<String, Appender> entry : appenderMap.entrySet()) {
-      Appender app = entry.getValue();
+    for (Appender app : appenderMap.values()) {
       app.flush();
       app.close();
       statSet.add(app.getStats());
@@ -97,9 +95,4 @@ public class HashBasedColPartitionStoreExec extends 
ColPartitionStoreExec {
 
     return null;
   }
-
-  @Override
-  public void rescan() throws IOException {
-    // nothing to do
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/c2aef7d3/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index 3a0dd38..607dff7 100644
--- 
a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ 
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -22,9 +22,10 @@
 package org.apache.tajo.engine.planner.physical;
 
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.datum.Datum;
+import 
org.apache.tajo.engine.planner.physical.ComparableVector.ComparableTuple;
 import org.apache.tajo.plan.logical.StoreTableNode;
 import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.StringUtils;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -35,35 +36,25 @@ import java.io.IOException;
  * ascending or descending order of partition columns.
  */
 public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
-  private Tuple currentKey;
-  private Tuple prevKey;
+
+  private ComparableTuple prevKey;
 
   public SortBasedColPartitionStoreExec(TaskAttemptContext context, 
StoreTableNode plan, PhysicalExec child)
       throws IOException {
     super(context, plan, child);
   }
 
-  public void init() throws IOException {
-    super.init();
-
-    currentKey = new VTuple(keyNum);
-  }
-
-  private void fillKeyTuple(Tuple inTuple, Tuple keyTuple) {
-    for (int i = 0; i < keyIds.length; i++) {
-      keyTuple.put(i, inTuple.asDatum(keyIds[i]));
-    }
-  }
-
-  private String getSubdirectory(Tuple keyTuple) {
-    StringBuilder sb = new StringBuilder();
+  private transient StringBuilder sb = new StringBuilder();
 
+  private String getSubdirectory(Tuple tuple) {
+    sb.setLength(0);
     for(int i = 0; i < keyIds.length; i++) {
-      if(i > 0) {
-        sb.append("/");
+      Datum datum = tuple.asDatum(keyIds[i]);
+      if (i > 0) {
+        sb.append('/');
       }
-      sb.append(keyNames[i]).append("=");      
-      sb.append(StringUtils.escapePathName(keyTuple.getText(i)));
+      sb.append(keyNames[i]).append('=');
+      sb.append(StringUtils.escapePathName(datum.asChars()));
     }
     return sb.toString();
   }
@@ -73,22 +64,19 @@ public class SortBasedColPartitionStoreExec extends 
ColPartitionStoreExec {
     Tuple tuple;
     while(!context.isStopped() && (tuple = child.next()) != null) {
 
-      fillKeyTuple(tuple, currentKey);
-
       if (prevKey == null) {
-        appender = getNextPartitionAppender(getSubdirectory(currentKey));
-        prevKey = new VTuple(currentKey);
-      } else {
-        if (!prevKey.equals(currentKey)) {
-          appender.close();
-          StatisticsUtil.aggregateTableStat(aggregatedStats, 
appender.getStats());
-
-          appender = getNextPartitionAppender(getSubdirectory(currentKey));
-          prevKey.put(currentKey.getValues());
-
-          // reset all states for file rotating
-          writtenFileNum = 0;
-        }
+        appender = getNextPartitionAppender(getSubdirectory(tuple));
+        prevKey = new ComparableTuple(inSchema, keyIds);
+        prevKey.set(tuple);
+      } else if (!prevKey.equals(tuple)) {
+        appender.close();
+        StatisticsUtil.aggregateTableStat(aggregatedStats, 
appender.getStats());
+
+        appender = getNextPartitionAppender(getSubdirectory(tuple));
+        prevKey.set(tuple);
+
+        // reset all states for file rotating
+        writtenFileNum = 0;
       }
 
       appender.addTuple(tuple);

Reply via email to