Hi,

when i try mvn install after these changes the
org.apache.drill.exec.store.parquet.ParquetRecordReaderTest is hanging.

Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.596 sec -
in org.apache.drill.exec.expr.ExpressionTest
Running org.apache.drill.exec.store.TestAffinityCalculator
Took 0.616287 ms to build range map
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.137 sec -
in org.apache.drill.exec.store.TestAffinityCalculator
Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.

Environment is Fedora 18, open jdk 1.7

with skipTests everything is getting compiled fine.

Regards
Tanujit



On Fri, Aug 23, 2013 at 5:36 AM, <[email protected]> wrote:

> DRILL-176:  Updates to affinity calculator, fixes for parquet
> serialization.  Fix to ErrorHelper looping
>
>
> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
> Commit:
> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617
> Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617
> Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617
>
> Branch: refs/heads/master
> Commit: 7edd36170a9be291a69e44f6090474193485bf14
> Parents: d6ae53e
> Author: Steven Phillips <[email protected]>
> Authored: Thu Aug 22 16:18:55 2013 -0700
> Committer: Jacques Nadeau <[email protected]>
> Committed: Thu Aug 22 16:18:55 2013 -0700
>
> ----------------------------------------------------------------------
>  .../drill/exec/planner/fragment/Wrapper.java    |   5 +-
>  .../drill/exec/store/AffinityCalculator.java    |  91 ++++++----
>  .../exec/store/parquet/ParquetGroupScan.java    | 177 +++++++++----------
>  .../exec/store/parquet/ParquetRecordReader.java |   2 +-
>  .../store/parquet/ParquetScanBatchCreator.java  |  10 +-
>  .../drill/exec/work/foreman/ErrorHelper.java    |   8 +-
>  .../exec/store/TestParquetPhysicalPlan.java     |  55 +++++-
>  .../store/parquet/ParquetRecordReaderTest.java  |  52 +++++-
>  .../parquet_scan_union_screen_physical.json     |   5 +-
>  9 files changed, 257 insertions(+), 148 deletions(-)
> ----------------------------------------------------------------------
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> index d5a24b0..8c4b0b4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
> @@ -151,15 +151,12 @@ public class Wrapper {
>        for (int i = start; i < start + width; i++) {
>          endpoints.add(all.get(i % div));
>        }
> -    } else if (values.size() < width) {
> -      throw new NotImplementedException(
> -          "Haven't implemented a scenario where we have some node
> affinity but the affinity list is smaller than the expected width.");
>      } else {
>        // get nodes with highest affinity.
>        Collections.sort(values);
>        values = Lists.reverse(values);
>        for (int i = 0; i < width; i++) {
> -        endpoints.add(values.get(i).getEndpoint());
> +        endpoints.add(values.get(i%values.size()).getEndpoint());
>        }
>      }
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> index b4092cc..b341ea4 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
> @@ -1,6 +1,7 @@
>  package org.apache.drill.exec.store;
>
>
> +import com.google.common.base.Stopwatch;
>  import com.google.common.collect.ImmutableRangeMap;
>  import com.google.common.collect.Range;
>  import org.apache.drill.exec.store.parquet.ParquetGroupScan;
> @@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path;
>
>  import java.io.IOException;
>  import java.util.*;
> +import java.util.concurrent.TimeUnit;
>
>  public class AffinityCalculator {
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
> @@ -24,6 +26,7 @@ public class AffinityCalculator {
>    String fileName;
>    Collection<DrillbitEndpoint> endpoints;
>    HashMap<String,DrillbitEndpoint> endPointMap;
> +  Stopwatch watch = new Stopwatch();
>
>    public AffinityCalculator(String fileName, FileSystem fs,
> Collection<DrillbitEndpoint> endpoints) {
>      this.fs = fs;
> @@ -33,16 +36,20 @@ public class AffinityCalculator {
>      buildEndpointMap();
>    }
>
> +  /**
> +   * Builds a mapping of block locations to file byte range
> +   */
>    private void buildBlockMap() {
>      try {
> +      watch.start();
>        FileStatus file = fs.getFileStatus(new Path(fileName));
> -      long tC = System.nanoTime();
>        blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
> -      long tD = System.nanoTime();
> +      watch.stop();
>        logger.debug("Block locations: {}", blocks);
> -      logger.debug("Took {} ms to get Block locations", (float)(tD - tC)
> / 1e6);
> +      logger.debug("Took {} ms to get Block locations",
> watch.elapsed(TimeUnit.MILLISECONDS));
>      } catch (IOException ioe) { throw new RuntimeException(ioe); }
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new
> ImmutableRangeMap.Builder<Long,BlockLocation>();
>      for (BlockLocation block : blocks) {
>        long start = block.getOffset();
> @@ -51,62 +58,72 @@ public class AffinityCalculator {
>        blockMapBuilder = blockMapBuilder.put(range, block);
>      }
>      blockMap = blockMapBuilder.build();
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6);
> +    watch.stop();
> +    logger.debug("Took {} ms to build block map",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
>    /**
> +   * For a given RowGroup, calculate how many bytes are available on each
> on drillbit endpoint
>     *
> -   * @param entry
> +   * @param rowGroup the RowGroup to calculate endpoint bytes for
>     */
> -  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
> -    long tA = System.nanoTime();
> +  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
> +    watch.reset();
> +    watch.start();
>      HashMap<String,Long> hostMap = new HashMap<>();
> -    long start = entry.getStart();
> -    long end = start + entry.getLength();
> -    Range<Long> entryRange = Range.closedOpen(start, end);
> -    ImmutableRangeMap<Long,BlockLocation> subRangeMap =
> blockMap.subRangeMap(entryRange);
> -    for (Map.Entry<Range<Long>,BlockLocation> e :
> subRangeMap.asMapOfRanges().entrySet()) {
> -      String[] hosts = null;
> -      Range<Long> blockRange = e.getKey();
> +    HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
> +    long start = rowGroup.getStart();
> +    long end = start + rowGroup.getLength();
> +    Range<Long> rowGroupRange = Range.closedOpen(start, end);
> +
> +    // Find submap of ranges that intersect with the rowGroup
> +    ImmutableRangeMap<Long,BlockLocation> subRangeMap =
> blockMap.subRangeMap(rowGroupRange);
> +
> +    // Iterate through each block in this submap and get the host for the
> block location
> +    for (Map.Entry<Range<Long>,BlockLocation> block :
> subRangeMap.asMapOfRanges().entrySet()) {
> +      String[] hosts;
> +      Range<Long> blockRange = block.getKey();
>        try {
> -        hosts = e.getValue().getHosts();
> -      } catch (IOException ioe) { /*TODO Handle this exception */}
> -      Range<Long> intersection = entryRange.intersection(blockRange);
> +        hosts = block.getValue().getHosts();
> +      } catch (IOException ioe) {
> +        throw new RuntimeException("Failed to get hosts for block
> location", ioe);
> +      }
> +      Range<Long> intersection = rowGroupRange.intersection(blockRange);
>        long bytes = intersection.upperEndpoint() -
> intersection.lowerEndpoint();
> +
> +      // For each host in the current block location, add the
> intersecting bytes to the corresponding endpoint
>        for (String host : hosts) {
> -        if (hostMap.containsKey(host)) {
> -          hostMap.put(host, hostMap.get(host) + bytes);
> +        DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
> +        if (endpointByteMap.containsKey(endpoint)) {
> +          endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) +
> bytes);
>          } else {
> -          hostMap.put(host, bytes);
> +          if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
>          }
>        }
>      }
> -    HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
> -    try {
> -      for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
> -        String host = hostEntry.getKey();
> -        Long bytes = hostEntry.getValue();
> -        DrillbitEndpoint d = getDrillBitEndpoint(host);
> -        if (d != null ) ebs.put(d, bytes);
> -      }
> -    } catch (NullPointerException n) {}
> -    entry.setEndpointBytes(ebs);
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) /
> 1e6);
> +
> +    rowGroup.setEndpointBytes(endpointByteMap);
> +    rowGroup.setMaxBytes(endpointByteMap.size() > 0 ?
> Collections.max(endpointByteMap.values()) : 0);
> +    logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(),
> rowGroup.getStart(), rowGroup.getMaxBytes());
> +    watch.stop();
> +    logger.debug("Took {} ms to set endpoint bytes",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
>
>    private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
>      return endPointMap.get(hostName);
>    }
>
> +  /**
> +   * Builds a mapping of drillbit endpoints to hostnames
> +   */
>    private void buildEndpointMap() {
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      endPointMap = new HashMap<String, DrillbitEndpoint>();
>      for (DrillbitEndpoint d : endpoints) {
>        String hostName = d.getAddress();
>        endPointMap.put(hostName, d);
>      }
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) /
> 1e6);
> +    watch.stop();
> +    logger.debug("Took {} ms to build endpoint map",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> index 9e48d33..64ced87 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
> @@ -18,14 +18,13 @@
>  package org.apache.drill.exec.store.parquet;
>
>  import java.io.IOException;
> -import java.util.ArrayList;
> -import java.util.Collection;
> -import java.util.Collections;
> -import java.util.Comparator;
> -import java.util.HashMap;
> -import java.util.LinkedList;
> -import java.util.List;
> +import java.util.*;
> +import java.util.concurrent.TimeUnit;
>
> +import com.google.common.base.Stopwatch;
> +import com.google.common.collect.ArrayListMultimap;
> +import com.google.common.collect.Lists;
> +import com.google.common.collect.Multimap;
>  import org.apache.drill.common.config.DrillConfig;
>  import org.apache.drill.exec.exception.SetupException;
>  import org.apache.drill.exec.physical.EndpointAffinity;
> @@ -59,8 +58,9 @@ import com.google.common.base.Preconditions;
>  public class ParquetGroupScan extends AbstractGroupScan {
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
>
> -  private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
> +  private ArrayListMultimap<Integer,
> ParquetRowGroupScan.RowGroupReadEntry> mappings;
>    private List<RowGroupInfo> rowGroupInfos;
> +  private Stopwatch watch = new Stopwatch();
>
>    public List<ReadEntryWithPath> getEntries() {
>      return entries;
> @@ -110,16 +110,14 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>    }
>
>    private void readFooter() throws IOException {
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      rowGroupInfos = new ArrayList();
>      long start = 0, length = 0;
>      ColumnChunkMetaData columnChunkMetaData;
>      for (ReadEntryWithPath readEntryWithPath : entries){
>        Path path = new Path(readEntryWithPath.getPath());
>        ParquetMetadata footer =
> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
> -//      FileSystem fs =
> FileSystem.get(this.storageEngine.getHadoopConfig());
> -//      FileStatus status = fs.getFileStatus(path);
> -//      ParquetMetadata footer =
> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
>        readEntryWithPath.getPath();
>
>        int i = 0;
> @@ -138,38 +136,21 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>          i++;
>        }
>      }
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to get row group infos", (float)(tB - tA) /
> 1E6);
> +    watch.stop();
> +    logger.debug("Took {} ms to get row group infos",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
>
>    private void calculateEndpointBytes() {
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      AffinityCalculator ac = new AffinityCalculator(fileName, fs,
> availableEndpoints);
>      for (RowGroupInfo e : rowGroupInfos) {
>        ac.setEndpointBytes(e);
>        totalBytes += e.getLength();
>      }
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB -
> tA) / 1E6);
> +    watch.stop();
> +    logger.debug("Took {} ms to calculate EndpointBytes",
> watch.elapsed(TimeUnit.MILLISECONDS));
>    }
> -/*
> -  public LinkedList<RowGroupInfo> getRowGroups() {
> -    return rowGroups;
> -  }
> -
> -  public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
> -    this.rowGroups = rowGroups;
> -  }
> -
> -  public static class ParquetFileReadEntry {
> -
> -    String path;
> -
> -    public ParquetFileReadEntry(@JsonProperty String path){
> -      this.path = path;
> -    }
> -  }
> -  */
>
>    @JsonIgnore
>    public FileSystem getFileSystem() {
> @@ -232,16 +213,22 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>      }
>    }
>
> +  /**
> +   *Calculates the affinity each endpoint has for this scan, by adding up
> the affinity each endpoint has for each
> +   * rowGroup
> +   * @return a list of EndpointAffinity objects
> +   */
>    @Override
>    public List<EndpointAffinity> getOperatorAffinity() {
> -    long tA = System.nanoTime();
> +    watch.reset();
> +    watch.start();
>      if (this.endpointAffinities == null) {
>        HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
>        for (RowGroupInfo entry : rowGroupInfos) {
>          for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
>            long bytes = entry.getEndpointBytes().get(d);
>            float affinity = (float)bytes / (float)totalBytes;
> -          logger.error("RowGroup: {} Endpoint: {} Bytes: {}",
> entry.getRowGroupIndex(), d.getAddress(), bytes);
> +          logger.debug("RowGroup: {} Endpoint: {} Bytes: {}",
> entry.getRowGroupIndex(), d.getAddress(), bytes);
>            if (affinities.keySet().contains(d)) {
>              affinities.put(d, affinities.get(d) + affinity);
>            } else {
> @@ -256,83 +243,90 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>        }
>        this.endpointAffinities = affinityList;
>      }
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to get operator affinity", (float)(tB - tA)
> / 1E6);
> +    watch.stop();
> +    logger.debug("Took {} ms to get operator affinity",
> watch.elapsed(TimeUnit.MILLISECONDS));
>      return this.endpointAffinities;
>    }
>
>
> +  static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
>
> -
> +  /**
> +   *
> +   * @param incomingEndpoints
> +   */
>    @Override
> -  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
> -    long tA = System.nanoTime();
> -    Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size());
> -
> -    int i = 0;
> -    for (DrillbitEndpoint endpoint : endpoints) {
> -      logger.debug("Endpoint index {}, endpoint host: {}", i++,
> endpoint.getAddress());
> +  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
> +    watch.reset();
> +    watch.start();
> +    Preconditions.checkArgument(incomingEndpoints.size() <=
> rowGroupInfos.size());
> +    mappings = ArrayListMultimap.create();
> +    ArrayList rowGroupList = new ArrayList(rowGroupInfos);
> +    List<DrillbitEndpoint> endpointLinkedlist =
> Lists.newLinkedList(incomingEndpoints);
> +    for(double cutoff : ASSIGNMENT_CUTOFFS ){
> +      scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff,
> false);
>      }
> -
> -    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
> -    mappings = new LinkedList[endpoints.size()];
> -    LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints,
> rowGroupInfos, 100, true, false);
> -    LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints,
> unassigned, 50, true, false);
> -    LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints,
> unassigned2, 25, true, false);
> -    LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints,
> unassigned3, 0, false, false);
> -    LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints,
> unassigned4, 0, false, true);
> -    assert unassigned5.size() == 0 : String.format("All readEntries
> should be assigned by now, but some are still unassigned");
> -    long tB = System.nanoTime();
> -    logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) /
> 1E6);
> +    scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
> +    watch.stop();
> +    logger.debug("Took {} ms to apply assignments",
> watch.elapsed(TimeUnit.MILLISECONDS));
> +    Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries
> should be assigned by now, but some are still unassigned");
> +    Preconditions.checkArgument(!rowGroupInfos.isEmpty());
>    }
>
> -  private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint>
> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean
> mustContain, boolean assignAll) {
> -    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
> -    LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
> -
> -    int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() *
> 1.5);
> +  public int fragmentPointer = 0;
> +
> +  /**
> +   *
> +   * @param endpointAssignments the mapping between fragment/endpoint and
> rowGroup
> +   * @param endpoints the list of drillbits, ordered by the corresponding
> fragment
> +   * @param rowGroups the list of rowGroups to assign
> +   * @param requiredPercentage the percentage of max bytes required to
> make an assignment
> +   * @param assignAll if true, will assign even if no affinity
> +   */
> +  private void scanAndAssign (Multimap<Integer,
> ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments,
> List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double
> requiredPercentage, boolean assignAll) {
> +    Collections.sort(rowGroups, new ParquetReadEntryComparator());
> +    final boolean requireAffinity = requiredPercentage > 0;
> +    int maxAssignments = (int) (rowGroups.size() / endpoints.size());
> +
> +    if (maxAssignments < 1) maxAssignments = 1;
> +
> +    for(Iterator<RowGroupInfo> iter = rowGroups.iterator();
> iter.hasNext();){
> +      RowGroupInfo rowGroupInfo = iter.next();
> +      for (int i = 0; i < endpoints.size(); i++) {
> +        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
> +        DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
> +        Map<DrillbitEndpoint, Long> bytesPerEndpoint =
> rowGroupInfo.getEndpointBytes();
> +        boolean haveAffinity =
> bytesPerEndpoint.containsKey(currentEndpoint) ;
>
> -    if (maxEntries < 1) maxEntries = 1;
> -
> -    int i =0;
> -    for(RowGroupInfo e : rowGroups) {
> -      boolean assigned = false;
> -      for (int j = i; j < i + endpoints.size(); j++) {
> -        DrillbitEndpoint currentEndpoint =
> endpoints.get(j%endpoints.size());
>          if (assignAll ||
> -                (e.getEndpointBytes().size() > 0 &&
> -                (e.getEndpointBytes().containsKey(currentEndpoint) ||
> !mustContain) &&
> -                (mappings[j%endpoints.size()] == null ||
> mappings[j%endpoints.size()].size() < maxEntries) &&
> -                e.getEndpointBytes().get(currentEndpoint) >=
> e.getMaxBytes() * requiredPercentage / 100)) {
> -          LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries =
> mappings[j%endpoints.size()];
> -          if(entries == null){
> -            entries = new
> LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
> -            mappings[j%endpoints.size()] = entries;
> -          }
> -          entries.add(e.getRowGroupReadEntry());
> -          logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}",
> e.getPath(), e.getStart(), currentEndpoint.getAddress());
> -          assigned = true;
> +                (!bytesPerEndpoint.isEmpty() &&
> +                        (!requireAffinity || haveAffinity) &&
> +
>  (!endpointAssignments.containsKey(minorFragmentId) ||
> endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
> +                        bytesPerEndpoint.get(currentEndpoint) >=
> rowGroupInfo.getMaxBytes() * requiredPercentage)) {
> +
> +          endpointAssignments.put(minorFragmentId,
> rowGroupInfo.getRowGroupReadEntry());
> +          logger.debug("Assigned rowGroup {} to minorFragmentId {}
> endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId,
> endpoints.get(minorFragmentId).getAddress());
> +          iter.remove();
> +          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
>            break;
>          }
>        }
> -      if (!assigned) unassigned.add(e);
> -      i++;
> +
>      }
> -    return unassigned;
>    }
>
>    @Override
>    public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
> -    assert minorFragmentId < mappings.length : String.format("Mappings
> length [%d] should be longer than minor fragment id [%d] but it isn't.",
> mappings.length, minorFragmentId);
> -    for (ParquetRowGroupScan.RowGroupReadEntry rg :
> mappings[minorFragmentId]) {
> +    assert minorFragmentId < mappings.size() : String.format("Mappings
> length [%d] should be longer than minor fragment id [%d] but it isn't.",
> mappings.size(), minorFragmentId);
> +    for (ParquetRowGroupScan.RowGroupReadEntry rg :
> mappings.get(minorFragmentId)) {
>        logger.debug("minorFragmentId: {} Path: {} RowGroupIndex:
> {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
>      }
> +    Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(),
> String.format("MinorFragmentId %d has no read entries assigned",
> minorFragmentId));
>      try {
> -      return new ParquetRowGroupScan(storageEngine, engineConfig,
> mappings[minorFragmentId]);
> +      return new ParquetRowGroupScan(storageEngine, engineConfig,
> mappings.get(minorFragmentId));
>      } catch (SetupException e) {
> -      e.printStackTrace(); // TODO - fix this
> +      throw new RuntimeException("Error setting up ParquetRowGroupScan",
> e);
>      }
> -    return null;
>    }
>
>    @Override
> @@ -342,7 +336,8 @@ public class ParquetGroupScan extends
> AbstractGroupScan {
>
>    @Override
>    public OperatorCost getCost() {
> -    return new OperatorCost(1,1,1,1);
> +    //TODO Figure out how to properly calculate cost
> +    return new OperatorCost(1,rowGroupInfos.size(),1,1);
>    }
>
>    @Override
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> index 4e46034..3aaa987 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
> @@ -211,8 +211,8 @@ public class ParquetRecordReader implements
> RecordReader {
>        }
>        for (VarLenBinaryReader.VarLengthColumn r :
> varLengthReader.columns) {
>          output.addField(r.valueVecHolder.getValueVector());
> -        output.setNewSchema();
>        }
> +      output.setNewSchema();
>      }catch(SchemaChangeException e) {
>        throw new ExecutionSetupException("Error setting up output
> mutator.", e);
>      }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> index 03fb4ec..addd288 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
> @@ -21,7 +21,9 @@ import java.io.IOException;
>  import java.text.SimpleDateFormat;
>  import java.util.Date;
>  import java.util.List;
> +import java.util.concurrent.TimeUnit;
>
> +import com.google.common.base.Stopwatch;
>  import org.apache.drill.common.exceptions.ExecutionSetupException;
>  import org.apache.drill.exec.ops.FragmentContext;
>  import org.apache.drill.exec.physical.impl.BatchCreator;
> @@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader;
>  import parquet.hadoop.metadata.ParquetMetadata;
>
>  public class ParquetScanBatchCreator implements
> BatchCreator<ParquetRowGroupScan>{
> -  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
> +  static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
>
>    @Override
>    public RecordBatch getBatch(FragmentContext context,
> ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws
> ExecutionSetupException {
> -    long tA = System.nanoTime(), tB;
> -    System.out.println( new SimpleDateFormat("mm:ss S").format(new
> Date()) + " :Start of ScanBatCreator.scanBatch");
> +    Stopwatch watch = new Stopwatch();
> +    watch.start();
>      Preconditions.checkArgument(children.isEmpty());
>      List<RecordReader> readers = Lists.newArrayList();
>      for(ParquetRowGroupScan.RowGroupReadEntry e :
> rowGroupScan.getRowGroupReadEntries()){
> @@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements
> BatchCreator<ParquetRowGroupScan
>          throw new ExecutionSetupException(e1);
>        }
>      }
> -    System.out.println( "Total time in method: " + ((float)
> (System.nanoTime() - tA) / 1e9));
> +    logger.debug("total time in ScanBatchCreator.getBatch: {} ms",
> watch.elapsed(TimeUnit.MILLISECONDS));
>      return new ScanBatch(context, readers.iterator());
>    }
>  }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> index 9a33109..72c5f34 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> +++
> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
> @@ -35,8 +35,8 @@ public class ErrorHelper {
>      if(message != null){
>        sb.append(message);
>      }
> -
> -    do{
> +
> +    while (true) {
>        sb.append(" < ");
>        sb.append(t.getClass().getSimpleName());
>        if(t.getMessage() != null){
> @@ -44,7 +44,9 @@ public class ErrorHelper {
>          sb.append(t.getMessage());
>          sb.append(" ]");
>        }
> -    }while(t.getCause() != null && t.getCause() != t);
> +      if (t.getCause() == null || t.getCause() == t) break;
> +      t = t.getCause();
> +    }
>
>      builder.setMessage(sb.toString());
>
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> index e2a00f1..18ac294 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
> @@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan;
>  import org.apache.drill.exec.planner.PhysicalPlanReader;
>  import org.apache.drill.exec.proto.CoordinationProtos;
>  import org.apache.drill.exec.proto.UserProtos;
> +import org.apache.drill.exec.record.RecordBatchLoader;
> +import org.apache.drill.exec.record.VectorWrapper;
> +import org.apache.drill.exec.rpc.RpcException;
>  import org.apache.drill.exec.rpc.user.QueryResultBatch;
> +import org.apache.drill.exec.rpc.user.UserResultsListener;
> +import org.apache.drill.exec.server.BootStrapContext;
>  import org.apache.drill.exec.server.Drillbit;
>  import org.apache.drill.exec.server.RemoteServiceSet;
>  import org.apache.drill.exec.store.parquet.ParquetGroupScan;
> +import org.apache.drill.exec.vector.ValueVector;
>  import org.apache.hadoop.fs.BlockLocation;
>  import org.apache.hadoop.fs.FileStatus;
>  import org.apache.hadoop.fs.FileSystem;
> @@ -29,6 +35,7 @@ import java.io.IOException;
>  import java.nio.charset.Charset;
>  import java.util.LinkedList;
>  import java.util.List;
> +import java.util.concurrent.CountDownLatch;
>
>  import static junit.framework.Assert.assertNull;
>  import static org.junit.Assert.assertEquals;
> @@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan {
>
>    //public String fileName = "/physical_test2.json";
>    public String fileName = "parquet_scan_union_screen_physical.json";
> +//  public String fileName = "parquet-sample.json";
> +
>
>    @Test
>    @Ignore
> @@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan {
>        bit1.run();
>        client.connect();
>        List<QueryResultBatch> results =
> client.runQuery(UserProtos.QueryType.PHYSICAL,
> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
> -      System.out.println(String.format("Got %d results", results.size()));
> +      RecordBatchLoader loader = new
> RecordBatchLoader(bit1.getContext().getAllocator());
> +      for (QueryResultBatch b : results) {
> +        System.out.println(String.format("Got %d results",
> b.getHeader().getRowCount()));
> +        loader.load(b.getHeader().getDef(), b.getData());
> +        for (VectorWrapper vw : loader) {
> +          System.out.println(vw.getValueVector().getField().getName());
> +          ValueVector vv = vw.getValueVector();
> +          for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
> +            Object o = vv.getAccessor().getObject(i);
> +            System.out.println(vv.getAccessor().getObject(i));
> +          }
> +        }
> +      }
> +      client.close();
> +    }
> +  }
> +
> +  private class ParquetResultsListener implements UserResultsListener {
> +    private CountDownLatch latch = new CountDownLatch(1);
> +    @Override
> +    public void submissionFailed(RpcException ex) {
> +      logger.error("submission failed", ex);
> +      latch.countDown();
> +    }
> +
> +    @Override
> +    public void resultArrived(QueryResultBatch result) {
> +      System.out.printf("Result batch arrived. Number of records: %d",
> result.getHeader().getRowCount());
> +      if (result.getHeader().getIsLastChunk()) latch.countDown();
> +    }
> +
> +    public void await() throws Exception {
> +      latch.await();
> +    }
> +  }
> +  @Test
> +  @Ignore
> +  public void testParseParquetPhysicalPlanRemote() throws Exception {
> +    DrillConfig config = DrillConfig.create();
> +
> +    try(DrillClient client = new DrillClient(config);){
> +      client.connect();
> +      ParquetResultsListener listener = new ParquetResultsListener();
> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8),
> listener);
> +      listener.await();
>        client.close();
>      }
>    }
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> index 1d91455..7a99c3f 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> +++
> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
> @@ -48,6 +48,7 @@ import
> org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
>  import org.apache.drill.exec.vector.BaseDataValueVector;
>  import org.apache.drill.exec.vector.ValueVector;
>  import org.junit.BeforeClass;
> +import org.junit.Ignore;
>  import org.junit.Test;
>
>  import parquet.bytes.BytesInput;
> @@ -68,6 +69,7 @@ public class ParquetRecordReaderTest {
>    static final org.slf4j.Logger logger =
> org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
>
>    private boolean VERBOSE_DEBUG = false;
> +  private boolean checkValues = true;
>
>    static final int numberRowGroups = 20;
>    static final int recordsPerRowGroup = 300000;
> @@ -100,6 +102,20 @@ public class ParquetRecordReaderTest {
>      testParquetFullEngineLocalText(planText, fileName, i,
> numberRowGroups, recordsPerRowGroup);
>    }
>
> +  @Test
> +  @Ignore
> +  public void testLocalDistributed() throws Exception {
> +    String planName = "/parquet/parquet_scan_union_screen_physical.json";
> +    testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20,
> 300000);
> +  }
> +
> +  @Test
> +  @Ignore
> +  public void testRemoteDistributed() throws Exception {
> +    String planName = "/parquet/parquet_scan_union_screen_physical.json";
> +    testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
> +  }
> +
>
>    private class ParquetResultListener implements UserResultsListener {
>      private SettableFuture<Void> future = SettableFuture.create();
> @@ -155,8 +171,12 @@ public class ParquetRecordReaderTest {
>            if (VERBOSE_DEBUG){
>              System.out.print(vv.getAccessor().getObject(j) + ", " + (j %
> 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
>            }
> -          assertField(vv, j, (TypeProtos.MinorType) currentField.type,
> -              currentField.values[(int) (columnValCounter % 3)], (String)
> currentField.name + "/");
> +          if (checkValues) {
> +            try {
> +              assertField(vv, j, (TypeProtos.MinorType) currentField.type,
> +                currentField.values[(int) (columnValCounter % 3)],
> (String) currentField.name + "/");
> +            } catch (AssertionError e) { submissionFailed(new
> RpcException(e)); }
> +          }
>            columnValCounter++;
>          }
>          if (VERBOSE_DEBUG){
> @@ -197,7 +217,9 @@ public class ParquetRecordReaderTest {
>        batchCounter++;
>        if(result.getHeader().getIsLastChunk()){
>          for (String s : valuesChecked.keySet()) {
> +          try {
>            assertEquals("Record count incorrect for column: " + s,
> totalRecords, (long) valuesChecked.get(s));
> +          } catch (AssertionError e) { submissionFailed(new
> RpcException(e)); }
>          }
>
>          assert valuesChecked.keySet().size() > 0;
> @@ -222,11 +244,13 @@ public class ParquetRecordReaderTest {
>
>      DrillConfig config = DrillConfig.create();
>
> +    checkValues = false;
> +
>      try(DrillClient client = new DrillClient(config);){
>        client.connect();
>        RecordBatchLoader batchLoader = new
> RecordBatchLoader(client.getAllocator());
>        ParquetResultListener resultListener = new
> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
> numberOfTimesRead);
> -      client.runQuery(UserProtos.QueryType.LOGICAL,
> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
> resultListener);
> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
> resultListener);
>        resultListener.get();
>      }
>
> @@ -259,6 +283,28 @@ public class ParquetRecordReaderTest {
>    }
>
>
> +  //use this method to submit physical plan
> +  public void testParquetFullEngineLocalTextDistributed(String planName,
> String filename, int numberOfTimesRead /* specified in json plan */, int
> numberOfRowGroups, int recordsPerRowGroup) throws Exception{
> +
> +    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
> +
> +    checkValues = false;
> +
> +    DrillConfig config = DrillConfig.create();
> +
> +    try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient
> client = new DrillClient(config, serviceSet.getCoordinator());){
> +      bit1.run();
> +      client.connect();
> +      RecordBatchLoader batchLoader = new
> RecordBatchLoader(client.getAllocator());
> +      ParquetResultListener resultListener = new
> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
> numberOfTimesRead);
> +      Stopwatch watch = new Stopwatch().start();
> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
> Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8),
> resultListener);
> +      resultListener.get();
> +      System.out.println(String.format("Took %d ms to run query",
> watch.elapsed(TimeUnit.MILLISECONDS)));
> +
> +    }
> +
> +  }
>
>    public String pad(String value, int length) {
>      return pad(value, length, " ");
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> ----------------------------------------------------------------------
> diff --git
> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> index f508d09..5efecaf 100644
> ---
> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> +++
> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
> @@ -11,10 +11,7 @@
>      @id : 1,
>      entries : [
>      {
> -        path : "/tmp/testParquetFile_many_types_3"
> -    },
> -    {
> -        path : "/tmp/testParquetFile_many_types_3"
> +        path : "/tmp/parquet_test_file_many_types"
>      }
>      ],
>      storageengine:{
>
>


-- 
Regards,
Tanujit

Reply via email to