DRILL-176:  Updates to affinity calculator, fixes for parquet serialization.  
Fix to ErrorHelper looping


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617

Branch: refs/heads/master
Commit: 7edd36170a9be291a69e44f6090474193485bf14
Parents: d6ae53e
Author: Steven Phillips <[email protected]>
Authored: Thu Aug 22 16:18:55 2013 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Thu Aug 22 16:18:55 2013 -0700

----------------------------------------------------------------------
 .../drill/exec/planner/fragment/Wrapper.java    |   5 +-
 .../drill/exec/store/AffinityCalculator.java    |  91 ++++++----
 .../exec/store/parquet/ParquetGroupScan.java    | 177 +++++++++----------
 .../exec/store/parquet/ParquetRecordReader.java |   2 +-
 .../store/parquet/ParquetScanBatchCreator.java  |  10 +-
 .../drill/exec/work/foreman/ErrorHelper.java    |   8 +-
 .../exec/store/TestParquetPhysicalPlan.java     |  55 +++++-
 .../store/parquet/ParquetRecordReaderTest.java  |  52 +++++-
 .../parquet_scan_union_screen_physical.json     |   5 +-
 9 files changed, 257 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index d5a24b0..8c4b0b4 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -151,15 +151,12 @@ public class Wrapper {
       for (int i = start; i < start + width; i++) {
         endpoints.add(all.get(i % div));
       }
-    } else if (values.size() < width) {
-      throw new NotImplementedException(
-          "Haven't implemented a scenario where we have some node affinity but 
the affinity list is smaller than the expected width.");
     } else {
       // get nodes with highest affinity.
       Collections.sort(values);
       values = Lists.reverse(values);
       for (int i = 0; i < width; i++) {
-        endpoints.add(values.get(i).getEndpoint());
+        endpoints.add(values.get(i%values.size()).getEndpoint());
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
index b4092cc..b341ea4 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
@@ -1,6 +1,7 @@
 package org.apache.drill.exec.store;
 
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.ImmutableRangeMap;
 import com.google.common.collect.Range;
 import org.apache.drill.exec.store.parquet.ParquetGroupScan;
@@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 
 public class AffinityCalculator {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
@@ -24,6 +26,7 @@ public class AffinityCalculator {
   String fileName;
   Collection<DrillbitEndpoint> endpoints;
   HashMap<String,DrillbitEndpoint> endPointMap;
+  Stopwatch watch = new Stopwatch();
 
   public AffinityCalculator(String fileName, FileSystem fs, 
Collection<DrillbitEndpoint> endpoints) {
     this.fs = fs;
@@ -33,16 +36,20 @@ public class AffinityCalculator {
     buildEndpointMap();
   }
 
+  /**
+   * Builds a mapping of block locations to file byte range
+   */
   private void buildBlockMap() {
     try {
+      watch.start();
       FileStatus file = fs.getFileStatus(new Path(fileName));
-      long tC = System.nanoTime();
       blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
-      long tD = System.nanoTime();
+      watch.stop();
       logger.debug("Block locations: {}", blocks);
-      logger.debug("Took {} ms to get Block locations", (float)(tD - tC) / 
1e6);
+      logger.debug("Took {} ms to get Block locations", 
watch.elapsed(TimeUnit.MILLISECONDS));
     } catch (IOException ioe) { throw new RuntimeException(ioe); }
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new 
ImmutableRangeMap.Builder<Long,BlockLocation>();
     for (BlockLocation block : blocks) {
       long start = block.getOffset();
@@ -51,62 +58,72 @@ public class AffinityCalculator {
       blockMapBuilder = blockMapBuilder.put(range, block);
     }
     blockMap = blockMapBuilder.build();
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6);
+    watch.stop();
+    logger.debug("Took {} ms to build block map", 
watch.elapsed(TimeUnit.MILLISECONDS));
   }
   /**
+   * For a given RowGroup, calculate how many bytes are available on each on 
drillbit endpoint
    *
-   * @param entry
+   * @param rowGroup the RowGroup to calculate endpoint bytes for
    */
-  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
-    long tA = System.nanoTime();
+  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
+    watch.reset();
+    watch.start();
     HashMap<String,Long> hostMap = new HashMap<>();
-    long start = entry.getStart();
-    long end = start + entry.getLength();
-    Range<Long> entryRange = Range.closedOpen(start, end);
-    ImmutableRangeMap<Long,BlockLocation> subRangeMap = 
blockMap.subRangeMap(entryRange);
-    for (Map.Entry<Range<Long>,BlockLocation> e : 
subRangeMap.asMapOfRanges().entrySet()) {
-      String[] hosts = null;
-      Range<Long> blockRange = e.getKey();
+    HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
+    long start = rowGroup.getStart();
+    long end = start + rowGroup.getLength();
+    Range<Long> rowGroupRange = Range.closedOpen(start, end);
+
+    // Find submap of ranges that intersect with the rowGroup
+    ImmutableRangeMap<Long,BlockLocation> subRangeMap = 
blockMap.subRangeMap(rowGroupRange);
+
+    // Iterate through each block in this submap and get the host for the 
block location
+    for (Map.Entry<Range<Long>,BlockLocation> block : 
subRangeMap.asMapOfRanges().entrySet()) {
+      String[] hosts;
+      Range<Long> blockRange = block.getKey();
       try {
-        hosts = e.getValue().getHosts();
-      } catch (IOException ioe) { /*TODO Handle this exception */}
-      Range<Long> intersection = entryRange.intersection(blockRange);
+        hosts = block.getValue().getHosts();
+      } catch (IOException ioe) {
+        throw new RuntimeException("Failed to get hosts for block location", 
ioe);
+      }
+      Range<Long> intersection = rowGroupRange.intersection(blockRange);
       long bytes = intersection.upperEndpoint() - intersection.lowerEndpoint();
+
+      // For each host in the current block location, add the intersecting 
bytes to the corresponding endpoint
       for (String host : hosts) {
-        if (hostMap.containsKey(host)) {
-          hostMap.put(host, hostMap.get(host) + bytes);
+        DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
+        if (endpointByteMap.containsKey(endpoint)) {
+          endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) + bytes);
         } else {
-          hostMap.put(host, bytes);
+          if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
         }
       }
     }
-    HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
-    try {
-      for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
-        String host = hostEntry.getKey();
-        Long bytes = hostEntry.getValue();
-        DrillbitEndpoint d = getDrillBitEndpoint(host);
-        if (d != null ) ebs.put(d, bytes);
-      }
-    } catch (NullPointerException n) {}
-    entry.setEndpointBytes(ebs);
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) / 1e6);
+
+    rowGroup.setEndpointBytes(endpointByteMap);
+    rowGroup.setMaxBytes(endpointByteMap.size() > 0 ? 
Collections.max(endpointByteMap.values()) : 0);
+    logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(), 
rowGroup.getStart(), rowGroup.getMaxBytes());
+    watch.stop();
+    logger.debug("Took {} ms to set endpoint bytes", 
watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
   private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
     return endPointMap.get(hostName);
   }
 
+  /**
+   * Builds a mapping of drillbit endpoints to hostnames
+   */
   private void buildEndpointMap() {
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     endPointMap = new HashMap<String, DrillbitEndpoint>();
     for (DrillbitEndpoint d : endpoints) {
       String hostName = d.getAddress();
       endPointMap.put(hostName, d);
     }
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) / 1e6);
+    watch.stop();
+    logger.debug("Took {} ms to build endpoint map", 
watch.elapsed(TimeUnit.MILLISECONDS));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 9e48d33..64ced87 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,14 +18,13 @@
 package org.apache.drill.exec.store.parquet;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.exception.SetupException;
 import org.apache.drill.exec.physical.EndpointAffinity;
@@ -59,8 +58,9 @@ import com.google.common.base.Preconditions;
 public class ParquetGroupScan extends AbstractGroupScan {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
 
-  private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
+  private ArrayListMultimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> 
mappings;
   private List<RowGroupInfo> rowGroupInfos;
+  private Stopwatch watch = new Stopwatch();
 
   public List<ReadEntryWithPath> getEntries() {
     return entries;
@@ -110,16 +110,14 @@ public class ParquetGroupScan extends AbstractGroupScan {
   }
 
   private void readFooter() throws IOException {
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     rowGroupInfos = new ArrayList();
     long start = 0, length = 0;
     ColumnChunkMetaData columnChunkMetaData;
     for (ReadEntryWithPath readEntryWithPath : entries){
       Path path = new Path(readEntryWithPath.getPath());
       ParquetMetadata footer = 
ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
-//      FileSystem fs = FileSystem.get(this.storageEngine.getHadoopConfig());
-//      FileStatus status = fs.getFileStatus(path);
-//      ParquetMetadata footer = 
ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
       readEntryWithPath.getPath();
 
       int i = 0;
@@ -138,38 +136,21 @@ public class ParquetGroupScan extends AbstractGroupScan {
         i++;
       }
     }
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to get row group infos", (float)(tB - tA) / 1E6);
+    watch.stop();
+    logger.debug("Took {} ms to get row group infos", 
watch.elapsed(TimeUnit.MILLISECONDS));
   }
 
   private void calculateEndpointBytes() {
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     AffinityCalculator ac = new AffinityCalculator(fileName, fs, 
availableEndpoints);
     for (RowGroupInfo e : rowGroupInfos) {
       ac.setEndpointBytes(e);
       totalBytes += e.getLength();
     }
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB - tA) / 
1E6);
+    watch.stop();
+    logger.debug("Took {} ms to calculate EndpointBytes", 
watch.elapsed(TimeUnit.MILLISECONDS));
   }
-/*
-  public LinkedList<RowGroupInfo> getRowGroups() {
-    return rowGroups;
-  }
-
-  public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
-    this.rowGroups = rowGroups;
-  }
-
-  public static class ParquetFileReadEntry {
-
-    String path;
-
-    public ParquetFileReadEntry(@JsonProperty String path){
-      this.path = path;
-    }
-  }
-  */
 
   @JsonIgnore
   public FileSystem getFileSystem() {
@@ -232,16 +213,22 @@ public class ParquetGroupScan extends AbstractGroupScan {
     }
   }
 
+  /**
+   *Calculates the affinity each endpoint has for this scan, by adding up the 
affinity each endpoint has for each
+   * rowGroup
+   * @return a list of EndpointAffinity objects
+   */
   @Override
   public List<EndpointAffinity> getOperatorAffinity() {
-    long tA = System.nanoTime();
+    watch.reset();
+    watch.start();
     if (this.endpointAffinities == null) {
       HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
       for (RowGroupInfo entry : rowGroupInfos) {
         for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
           long bytes = entry.getEndpointBytes().get(d);
           float affinity = (float)bytes / (float)totalBytes;
-          logger.error("RowGroup: {} Endpoint: {} Bytes: {}", 
entry.getRowGroupIndex(), d.getAddress(), bytes);
+          logger.debug("RowGroup: {} Endpoint: {} Bytes: {}", 
entry.getRowGroupIndex(), d.getAddress(), bytes);
           if (affinities.keySet().contains(d)) {
             affinities.put(d, affinities.get(d) + affinity);
           } else {
@@ -256,83 +243,90 @@ public class ParquetGroupScan extends AbstractGroupScan {
       }
       this.endpointAffinities = affinityList;
     }
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to get operator affinity", (float)(tB - tA) / 
1E6);
+    watch.stop();
+    logger.debug("Took {} ms to get operator affinity", 
watch.elapsed(TimeUnit.MILLISECONDS));
     return this.endpointAffinities;
   }
 
 
+  static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
 
-
+  /**
+   *
+   * @param incomingEndpoints
+   */
   @Override
-  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
-    long tA = System.nanoTime();
-    Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size());
-
-    int i = 0;
-    for (DrillbitEndpoint endpoint : endpoints) {
-      logger.debug("Endpoint index {}, endpoint host: {}", i++, 
endpoint.getAddress());
+  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
+    watch.reset();
+    watch.start();
+    Preconditions.checkArgument(incomingEndpoints.size() <= 
rowGroupInfos.size());
+    mappings = ArrayListMultimap.create();
+    ArrayList rowGroupList = new ArrayList(rowGroupInfos);
+    List<DrillbitEndpoint> endpointLinkedlist = 
Lists.newLinkedList(incomingEndpoints);
+    for(double cutoff : ASSIGNMENT_CUTOFFS ){
+      scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, false);
     }
-
-    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
-    mappings = new LinkedList[endpoints.size()];
-    LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints, 
rowGroupInfos, 100, true, false);
-    LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints, 
unassigned, 50, true, false);
-    LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints, 
unassigned2, 25, true, false);
-    LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints, 
unassigned3, 0, false, false);
-    LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints, 
unassigned4, 0, false, true);
-    assert unassigned5.size() == 0 : String.format("All readEntries should be 
assigned by now, but some are still unassigned");
-    long tB = System.nanoTime();
-    logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) / 1E6);
+    scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
+    watch.stop();
+    logger.debug("Took {} ms to apply assignments", 
watch.elapsed(TimeUnit.MILLISECONDS));
+    Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries 
should be assigned by now, but some are still unassigned");
+    Preconditions.checkArgument(!rowGroupInfos.isEmpty());
   }
 
-  private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint> 
endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean 
mustContain, boolean assignAll) {
-    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
-    LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
-
-    int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() * 1.5);
+  public int fragmentPointer = 0;
+
+  /**
+   *
+   * @param endpointAssignments the mapping between fragment/endpoint and 
rowGroup
+   * @param endpoints the list of drillbits, ordered by the corresponding 
fragment
+   * @param rowGroups the list of rowGroups to assign
+   * @param requiredPercentage the percentage of max bytes required to make an 
assignment
+   * @param assignAll if true, will assign even if no affinity
+   */
+  private void scanAndAssign (Multimap<Integer, 
ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, 
List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double 
requiredPercentage, boolean assignAll) {
+    Collections.sort(rowGroups, new ParquetReadEntryComparator());
+    final boolean requireAffinity = requiredPercentage > 0;
+    int maxAssignments = (int) (rowGroups.size() / endpoints.size());
+
+    if (maxAssignments < 1) maxAssignments = 1;
+
+    for(Iterator<RowGroupInfo> iter = rowGroups.iterator(); iter.hasNext();){
+      RowGroupInfo rowGroupInfo = iter.next();
+      for (int i = 0; i < endpoints.size(); i++) {
+        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
+        DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
+        Map<DrillbitEndpoint, Long> bytesPerEndpoint = 
rowGroupInfo.getEndpointBytes();
+        boolean haveAffinity = bytesPerEndpoint.containsKey(currentEndpoint) ;
 
-    if (maxEntries < 1) maxEntries = 1;
-
-    int i =0;
-    for(RowGroupInfo e : rowGroups) {
-      boolean assigned = false;
-      for (int j = i; j < i + endpoints.size(); j++) {
-        DrillbitEndpoint currentEndpoint = endpoints.get(j%endpoints.size());
         if (assignAll ||
-                (e.getEndpointBytes().size() > 0 &&
-                (e.getEndpointBytes().containsKey(currentEndpoint) || 
!mustContain) &&
-                (mappings[j%endpoints.size()] == null || 
mappings[j%endpoints.size()].size() < maxEntries) &&
-                e.getEndpointBytes().get(currentEndpoint) >= e.getMaxBytes() * 
requiredPercentage / 100)) {
-          LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries = 
mappings[j%endpoints.size()];
-          if(entries == null){
-            entries = new LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
-            mappings[j%endpoints.size()] = entries;
-          }
-          entries.add(e.getRowGroupReadEntry());
-          logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}", 
e.getPath(), e.getStart(), currentEndpoint.getAddress());
-          assigned = true;
+                (!bytesPerEndpoint.isEmpty() &&
+                        (!requireAffinity || haveAffinity) &&
+                        (!endpointAssignments.containsKey(minorFragmentId) || 
endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
+                        bytesPerEndpoint.get(currentEndpoint) >= 
rowGroupInfo.getMaxBytes() * requiredPercentage)) {
+
+          endpointAssignments.put(minorFragmentId, 
rowGroupInfo.getRowGroupReadEntry());
+          logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint 
{}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, 
endpoints.get(minorFragmentId).getAddress());
+          iter.remove();
+          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
           break;
         }
       }
-      if (!assigned) unassigned.add(e);
-      i++;
+
     }
-    return unassigned;
   }
 
   @Override
   public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
-    assert minorFragmentId < mappings.length : String.format("Mappings length 
[%d] should be longer than minor fragment id [%d] but it isn't.", 
mappings.length, minorFragmentId);
-    for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings[minorFragmentId]) 
{
+    assert minorFragmentId < mappings.size() : String.format("Mappings length 
[%d] should be longer than minor fragment id [%d] but it isn't.", 
mappings.size(), minorFragmentId);
+    for (ParquetRowGroupScan.RowGroupReadEntry rg : 
mappings.get(minorFragmentId)) {
       logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: 
{}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
     }
+    Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(), 
String.format("MinorFragmentId %d has no read entries assigned", 
minorFragmentId));
     try {
-      return new ParquetRowGroupScan(storageEngine, engineConfig, 
mappings[minorFragmentId]);
+      return new ParquetRowGroupScan(storageEngine, engineConfig, 
mappings.get(minorFragmentId));
     } catch (SetupException e) {
-      e.printStackTrace(); // TODO - fix this
+      throw new RuntimeException("Error setting up ParquetRowGroupScan", e);
     }
-    return null;
   }
 
   @Override
@@ -342,7 +336,8 @@ public class ParquetGroupScan extends AbstractGroupScan {
 
   @Override
   public OperatorCost getCost() {
-    return new OperatorCost(1,1,1,1);
+    //TODO Figure out how to properly calculate cost
+    return new OperatorCost(1,rowGroupInfos.size(),1,1);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 4e46034..3aaa987 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -211,8 +211,8 @@ public class ParquetRecordReader implements RecordReader {
       }
       for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns) {
         output.addField(r.valueVecHolder.getValueVector());
-        output.setNewSchema();
       }
+      output.setNewSchema();
     }catch(SchemaChangeException e) {
       throw new ExecutionSetupException("Error setting up output mutator.", e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 03fb4ec..addd288 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Stopwatch;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
 
 public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan>{
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
 
   @Override
   public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan 
rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
-    long tA = System.nanoTime(), tB;
-    System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " 
:Start of ScanBatCreator.scanBatch");
+    Stopwatch watch = new Stopwatch();
+    watch.start();
     Preconditions.checkArgument(children.isEmpty());
     List<RecordReader> readers = Lists.newArrayList();
     for(ParquetRowGroupScan.RowGroupReadEntry e : 
rowGroupScan.getRowGroupReadEntries()){
@@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
         throw new ExecutionSetupException(e1);
       }
     }
-    System.out.println( "Total time in method: " + ((float) (System.nanoTime() 
- tA) / 1e9));
+    logger.debug("total time in ScanBatchCreator.getBatch: {} ms", 
watch.elapsed(TimeUnit.MILLISECONDS));
     return new ScanBatch(context, readers.iterator());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
index 9a33109..72c5f34 100644
--- 
a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
+++ 
b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
@@ -35,8 +35,8 @@ public class ErrorHelper {
     if(message != null){
       sb.append(message);
     }
-      
-    do{
+
+    while (true) {
       sb.append(" < ");
       sb.append(t.getClass().getSimpleName());
       if(t.getMessage() != null){
@@ -44,7 +44,9 @@ public class ErrorHelper {
         sb.append(t.getMessage());
         sb.append(" ]");
       }
-    }while(t.getCause() != null && t.getCause() != t);
+      if (t.getCause() == null || t.getCause() == t) break;
+      t = t.getCause();
+    }
     
     builder.setMessage(sb.toString());
     

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
index e2a00f1..18ac294 100644
--- 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
+++ 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
@@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan;
 import org.apache.drill.exec.planner.PhysicalPlanReader;
 import org.apache.drill.exec.proto.CoordinationProtos;
 import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.rpc.user.UserResultsListener;
+import org.apache.drill.exec.server.BootStrapContext;
 import org.apache.drill.exec.server.Drillbit;
 import org.apache.drill.exec.server.RemoteServiceSet;
 import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -29,6 +35,7 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 import static junit.framework.Assert.assertNull;
 import static org.junit.Assert.assertEquals;
@@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan {
 
   //public String fileName = "/physical_test2.json";
   public String fileName = "parquet_scan_union_screen_physical.json";
+//  public String fileName = "parquet-sample.json";
+
 
   @Test
   @Ignore
@@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan {
       bit1.run();
       client.connect();
       List<QueryResultBatch> results = 
client.runQuery(UserProtos.QueryType.PHYSICAL, 
Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
-      System.out.println(String.format("Got %d results", results.size()));
+      RecordBatchLoader loader = new 
RecordBatchLoader(bit1.getContext().getAllocator());
+      for (QueryResultBatch b : results) {
+        System.out.println(String.format("Got %d results", 
b.getHeader().getRowCount()));
+        loader.load(b.getHeader().getDef(), b.getData());
+        for (VectorWrapper vw : loader) {
+          System.out.println(vw.getValueVector().getField().getName());
+          ValueVector vv = vw.getValueVector();
+          for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
+            Object o = vv.getAccessor().getObject(i);
+            System.out.println(vv.getAccessor().getObject(i));
+          }
+        }
+      }
+      client.close();
+    }
+  }
+
+  private class ParquetResultsListener implements UserResultsListener {
+    private CountDownLatch latch = new CountDownLatch(1);
+    @Override
+    public void submissionFailed(RpcException ex) {
+      logger.error("submission failed", ex);
+      latch.countDown();
+    }
+
+    @Override
+    public void resultArrived(QueryResultBatch result) {
+      System.out.printf("Result batch arrived. Number of records: %d", 
result.getHeader().getRowCount());
+      if (result.getHeader().getIsLastChunk()) latch.countDown();
+    }
+
+    public void await() throws Exception {
+      latch.await();
+    }
+  }
+  @Test
+  @Ignore
+  public void testParseParquetPhysicalPlanRemote() throws Exception {
+    DrillConfig config = DrillConfig.create();
+
+    try(DrillClient client = new DrillClient(config);){
+      client.connect();
+      ParquetResultsListener listener = new ParquetResultsListener();
+      client.runQuery(UserProtos.QueryType.PHYSICAL, 
Resources.toString(Resources.getResource(fileName),Charsets.UTF_8), listener);
+      listener.await();
       client.close();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 1d91455..7a99c3f 100644
--- 
a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ 
b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -48,6 +48,7 @@ import 
org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import parquet.bytes.BytesInput;
@@ -68,6 +69,7 @@ public class ParquetRecordReaderTest {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
 
   private boolean VERBOSE_DEBUG = false;
+  private boolean checkValues = true;
 
   static final int numberRowGroups = 20;
   static final int recordsPerRowGroup = 300000;
@@ -100,6 +102,20 @@ public class ParquetRecordReaderTest {
     testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, 
recordsPerRowGroup);
   }
 
+  @Test
+  @Ignore
+  public void testLocalDistributed() throws Exception {
+    String planName = "/parquet/parquet_scan_union_screen_physical.json";
+    testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20, 
300000);
+  }
+
+  @Test
+  @Ignore
+  public void testRemoteDistributed() throws Exception {
+    String planName = "/parquet/parquet_scan_union_screen_physical.json";
+    testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
+  }
+
 
   private class ParquetResultListener implements UserResultsListener {
     private SettableFuture<Void> future = SettableFuture.create();
@@ -155,8 +171,12 @@ public class ParquetRecordReaderTest {
           if (VERBOSE_DEBUG){
             System.out.print(vv.getAccessor().getObject(j) + ", " + (j % 25 == 
0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
           }
-          assertField(vv, j, (TypeProtos.MinorType) currentField.type,
-              currentField.values[(int) (columnValCounter % 3)], (String) 
currentField.name + "/");
+          if (checkValues) {
+            try {
+              assertField(vv, j, (TypeProtos.MinorType) currentField.type,
+                currentField.values[(int) (columnValCounter % 3)], (String) 
currentField.name + "/");
+            } catch (AssertionError e) { submissionFailed(new 
RpcException(e)); }
+          }
           columnValCounter++;
         }
         if (VERBOSE_DEBUG){
@@ -197,7 +217,9 @@ public class ParquetRecordReaderTest {
       batchCounter++;
       if(result.getHeader().getIsLastChunk()){
         for (String s : valuesChecked.keySet()) {
+          try {
           assertEquals("Record count incorrect for column: " + s, 
totalRecords, (long) valuesChecked.get(s));
+          } catch (AssertionError e) { submissionFailed(new RpcException(e)); }
         }
         
         assert valuesChecked.keySet().size() > 0;
@@ -222,11 +244,13 @@ public class ParquetRecordReaderTest {
     
     DrillConfig config = DrillConfig.create();
 
+    checkValues = false;
+
     try(DrillClient client = new DrillClient(config);){
       client.connect();
       RecordBatchLoader batchLoader = new 
RecordBatchLoader(client.getAllocator());
       ParquetResultListener resultListener = new 
ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, 
numberOfTimesRead);
-      client.runQuery(UserProtos.QueryType.LOGICAL, 
Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), 
resultListener);
+      client.runQuery(UserProtos.QueryType.PHYSICAL, 
Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), 
resultListener);
       resultListener.get();
     }
     
@@ -259,6 +283,28 @@ public class ParquetRecordReaderTest {
   }
 
 
+  //use this method to submit physical plan
+  public void testParquetFullEngineLocalTextDistributed(String planName, 
String filename, int numberOfTimesRead /* specified in json plan */, int 
numberOfRowGroups, int recordsPerRowGroup) throws Exception{
+
+    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    checkValues = false;
+
+    DrillConfig config = DrillConfig.create();
+
+    try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = 
new DrillClient(config, serviceSet.getCoordinator());){
+      bit1.run();
+      client.connect();
+      RecordBatchLoader batchLoader = new 
RecordBatchLoader(client.getAllocator());
+      ParquetResultListener resultListener = new 
ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, 
numberOfTimesRead);
+      Stopwatch watch = new Stopwatch().start();
+      client.runQuery(UserProtos.QueryType.PHYSICAL, 
Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8), 
resultListener);
+      resultListener.get();
+      System.out.println(String.format("Took %d ms to run query", 
watch.elapsed(TimeUnit.MILLISECONDS)));
+
+    }
+
+  }
 
   public String pad(String value, int length) {
     return pad(value, length, " ");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
----------------------------------------------------------------------
diff --git 
a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
 
b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
index f508d09..5efecaf 100644
--- 
a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
+++ 
b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
@@ -11,10 +11,7 @@
     @id : 1,
     entries : [
     {
-        path : "/tmp/testParquetFile_many_types_3"
-    },
-    {
-        path : "/tmp/testParquetFile_many_types_3"
+        path : "/tmp/parquet_test_file_many_types"
     }
     ],
     storageengine:{

Reply via email to