A more detailed output, i just ran the purticular test this time

[tanujit@legolas java-exec]$ mvn -Dtest=ParquetRecordReaderTest test
[INFO] Scanning for projects...
[INFO]

[INFO]
------------------------------------------------------------------------
[INFO] Building java-exec 1.0-SNAPSHOT
[INFO]
------------------------------------------------------------------------
[INFO]
[INFO] --- maven-resources-plugin:2.6:copy-resources (copy-resources) @
java-exec ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 369 resources
[INFO]
[INFO] --- maven-enforcer-plugin:1.2:enforce (no_commons_logging) @
java-exec ---
[INFO]
[INFO] --- maven-antrun-plugin:1.6:run (generate-sources) @ java-exec ---
[WARNING] Parameter tasks is deprecated, use target instead
[INFO] Executing tasks

main:
[INFO] Executed tasks
[INFO] Registering compile source root
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/generated-sources
[INFO]
[INFO] --- fmpp-maven-plugin:1.0:generate (generate-sources) @ java-exec ---
- Executing: ValueHolders.java
log4j:WARN No appenders could be found for logger (freemarker.cache).
log4j:WARN Please initialize the log4j system properly.
- Executing: VariableLengthVectors.java
- Executing: FixedValueVectors.java
- Executing: TypeHelper.java
- Executing: NullableValueVectors.java
- Executing: RepeatedValueVectors.java
[INFO] Done
[INFO]
[INFO] --- maven-remote-resources-plugin:1.1:process (default) @ java-exec
---
[INFO] Setting property: classpath.resource.loader.class =>
'org.codehaus.plexus.velocity.ContextClassLoaderResourceLoader'.
[INFO] Setting property: velocimacro.messages.on => 'false'.
[INFO] Setting property: resource.loader => 'classpath'.
[INFO] Setting property: resource.manager.logwhenfound => 'false'.
[INFO]
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @
java-exec ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.0:compile (default-compile) @ java-exec
---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 459 source files to
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/classes
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java:[20,21]
com.sun.corba.se.impl.interceptors.CodecFactoryImpl is internal proprietary
API and may be removed in a future release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java:
Some input files use or override a deprecated API.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java:
Recompile with -Xlint:deprecation for details.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java:
Some input files use unchecked or unsafe operations.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java:
Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources)
@ java-exec ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 36 resources
[INFO] Copying 3 resources
[INFO]
[INFO] --- maven-compiler-plugin:3.0:testCompile (default-testCompile) @
java-exec ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 126 source files to
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/test-classes
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[25,16]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[53,17]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[94,12]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[95,14]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[98,26]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[100,27]
sun.misc.Unsafe is internal proprietary API and may be removed in a future
release
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java:
Some input files use or override a deprecated API.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java:
Recompile with -Xlint:deprecation for details.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java:
Some input files use unchecked or unsafe operations.
[WARNING]
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java:
Recompile with -Xlint:unchecked for details.
[INFO]
[INFO] --- maven-surefire-plugin:2.15:test (default-test) @ java-exec ---
[INFO] Surefire report directory:
/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/surefire-reports

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest
08:18:15.598 [main] DEBUG org.reflections.Reflections - going to scan these
urls:
jar:file:/home/tanujit/.m2/repository/org/apache/drill/common/1.0-SNAPSHOT/common-1.0-SNAPSHOT.jar!/
file:/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/test-classes/
file:/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/classes/
jar:file:/home/tanujit/.m2/repository/org/apache/drill/common/1.0-SNAPSHOT/common-1.0-SNAPSHOT-tests.jar!/

08:18:16.124 [main] INFO  org.reflections.Reflections - Reflections took
524 ms to scan 4 urls, producing 667 keys and 1620 values
08:18:16.148 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as
the default logging framework
08:18:16.150 [main] DEBUG i.n.c.MultithreadEventLoopGroup -
-Dio.netty.eventLoopThreads: 8
08:18:16.165 [main] DEBUG i.n.util.internal.PlatformDependent - UID: 1000
08:18:16.165 [main] DEBUG i.n.util.internal.PlatformDependent - Java
version: 7
08:18:16.166 [main] DEBUG i.n.util.internal.PlatformDependent -
-Dio.netty.noUnsafe: false
08:18:16.167 [main] DEBUG i.n.util.internal.PlatformDependent0 -
java.nio.ByteBuffer.cleaner: available
08:18:16.167 [main] DEBUG i.n.util.internal.PlatformDependent0 -
java.nio.Buffer.address: available
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 -
sun.misc.Unsafe.theUnsafe: available
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 -
sun.misc.Unsafe.copyMemory: available
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 -
java.nio.Bits.unaligned: true
08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent -
sun.misc.Unsafe: available
08:18:16.169 [main] DEBUG i.n.util.internal.PlatformDependent -
-Dio.netty.noJavassist: false
08:18:16.227 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist:
available
08:18:16.228 [main] DEBUG i.n.util.internal.PlatformDependent -
-Dio.netty.noPreferDirect: false
08:18:16.249 [main] DEBUG io.netty.channel.nio.NioEventLoop -
-Dio.netty.noKeySetOptimization: false
08:18:16.249 [main] DEBUG io.netty.channel.nio.NioEventLoop -
-Dio.netty.selectorAutoRebuildThreshold: 512
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.numHeapArenas: 4
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.numDirectArenas: 4
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.pageSize: 8192
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.maxOrder: 11
08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL -
-Dio.netty.allocator.chunkSize: 16777216
08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo
08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback address:
/0:0:0:0:0:0:0:1%1 (primary)
08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback address: /
127.0.0.1
08:18:16.335 [main] DEBUG io.netty.util.NetUtil -
/proc/sys/net/core/somaxconn: 128
08:18:16.344 [UserServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x30e3b962] REGISTERED
08:18:16.347 [UserServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x30e3b962] BIND(0.0.0.0/0.0.0.0:31010)
08:18:16.350 [UserServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x30e3b962, /0:0:0:0:0:0:0:0:31010] ACTIVE
08:18:16.366 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b] REGISTERED
08:18:16.367 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b] BIND(0.0.0.0/0.0.0.0:31011)
08:18:16.367 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] ACTIVE
08:18:16.571 [Client-1] DEBUG
i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated:
io.netty.util.internal.__matchers__.io.netty.buffer.ByteBufMatcher
08:18:16.574 [Client-1] DEBUG
i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated:
io.netty.util.internal.__matchers__.org.apache.drill.exec.rpc.OutboundRpcMessageMatcher
08:18:16.576 [Client-1] DEBUG
i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated:
io.netty.util.internal.__matchers__.org.apache.drill.exec.rpc.InboundRpcMessageMatcher
08:18:16.579 [UserServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x30e3b962, /0:0:0:0:0:0:0:0:31010] RECEIVED: [id: 0xf33275f9, /
192.168.0.102:34995 => /192.168.0.102:31010]
08:18:16.589 [Client-1] DEBUG io.netty.util.ResourceLeakDetector -
-Dio.netty.noResourceLeakDetection: false
08:18:16.770 [WorkManager-1] DEBUG org.apache.hadoop.security.Groups -
 Creating new Groups object
08:18:16.817 [WorkManager-1] DEBUG org.apache.hadoop.security.Groups -
Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
cacheTimeout=300000
08:18:16.882 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
hadoop login
08:18:16.882 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
hadoop login commit
08:18:16.898 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
using local user:UnixPrincipal: tanujit
08:18:16.900 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation -
UGI loginUser:tanujit
08:18:16.940 [WorkManager-1] DEBUG org.apache.hadoop.fs.FileSystem -
Creating filesystem for file:///
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
details.
08:18:17.922 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x1f148baa, /
192.168.0.102:45778 => /192.168.0.102:31011]
08:18:17.923 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xc4ed7542, /
192.168.0.102:45779 => /192.168.0.102:31011]
08:18:17.937 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x70a4a44c, /
192.168.0.102:45780 => /192.168.0.102:31011]
08:18:17.937 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xf6360725, /
192.168.0.102:45781 => /192.168.0.102:31011]
08:18:17.937 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x51467583, /
192.168.0.102:45782 => /192.168.0.102:31011]
08:18:17.938 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xaf6d72df, /
192.168.0.102:45783 => /192.168.0.102:31011]
08:18:17.938 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x6a8a1670, /
192.168.0.102:45784 => /192.168.0.102:31011]
08:18:17.938 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xf18205a3, /
192.168.0.102:45785 => /192.168.0.102:31011]
08:18:17.939 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xa09abc2c, /
192.168.0.102:45786 => /192.168.0.102:31011]
08:18:17.943 [BitServer-1] INFO  i.n.handler.logging.LoggingHandler - [id:
0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x3784f526, /
192.168.0.102:45787 => /192.168.0.102:31011]





On Sat, Aug 24, 2013 at 8:15 AM, Tanujit Ghosh <[email protected]>wrote:

> Hi,
>
> when i try mvn install after these changes the
> org.apache.drill.exec.store.parquet.ParquetRecordReaderTest is hanging.
>
> Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.596 sec
> - in org.apache.drill.exec.expr.ExpressionTest
> Running org.apache.drill.exec.store.TestAffinityCalculator
> Took 0.616287 ms to build range map
> Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.137 sec
> - in org.apache.drill.exec.store.TestAffinityCalculator
> Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest
> SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
>
> Environment is Fedora 18, open jdk 1.7
>
> with skipTests everything is getting compiled fine.
>
> Regards
> Tanujit
>
>
>
> On Fri, Aug 23, 2013 at 5:36 AM, <[email protected]> wrote:
>
>> DRILL-176:  Updates to affinity calculator, fixes for parquet
>> serialization.  Fix to ErrorHelper looping
>>
>>
>> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
>> Commit:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617
>> Tree:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617
>> Diff:
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617
>>
>> Branch: refs/heads/master
>> Commit: 7edd36170a9be291a69e44f6090474193485bf14
>> Parents: d6ae53e
>> Author: Steven Phillips <[email protected]>
>> Authored: Thu Aug 22 16:18:55 2013 -0700
>> Committer: Jacques Nadeau <[email protected]>
>> Committed: Thu Aug 22 16:18:55 2013 -0700
>>
>> ----------------------------------------------------------------------
>>  .../drill/exec/planner/fragment/Wrapper.java    |   5 +-
>>  .../drill/exec/store/AffinityCalculator.java    |  91 ++++++----
>>  .../exec/store/parquet/ParquetGroupScan.java    | 177 +++++++++----------
>>  .../exec/store/parquet/ParquetRecordReader.java |   2 +-
>>  .../store/parquet/ParquetScanBatchCreator.java  |  10 +-
>>  .../drill/exec/work/foreman/ErrorHelper.java    |   8 +-
>>  .../exec/store/TestParquetPhysicalPlan.java     |  55 +++++-
>>  .../store/parquet/ParquetRecordReaderTest.java  |  52 +++++-
>>  .../parquet_scan_union_screen_physical.json     |   5 +-
>>  9 files changed, 257 insertions(+), 148 deletions(-)
>> ----------------------------------------------------------------------
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> index d5a24b0..8c4b0b4 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
>> @@ -151,15 +151,12 @@ public class Wrapper {
>>        for (int i = start; i < start + width; i++) {
>>          endpoints.add(all.get(i % div));
>>        }
>> -    } else if (values.size() < width) {
>> -      throw new NotImplementedException(
>> -          "Haven't implemented a scenario where we have some node
>> affinity but the affinity list is smaller than the expected width.");
>>      } else {
>>        // get nodes with highest affinity.
>>        Collections.sort(values);
>>        values = Lists.reverse(values);
>>        for (int i = 0; i < width; i++) {
>> -        endpoints.add(values.get(i).getEndpoint());
>> +        endpoints.add(values.get(i%values.size()).getEndpoint());
>>        }
>>      }
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> index b4092cc..b341ea4 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java
>> @@ -1,6 +1,7 @@
>>  package org.apache.drill.exec.store;
>>
>>
>> +import com.google.common.base.Stopwatch;
>>  import com.google.common.collect.ImmutableRangeMap;
>>  import com.google.common.collect.Range;
>>  import org.apache.drill.exec.store.parquet.ParquetGroupScan;
>> @@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path;
>>
>>  import java.io.IOException;
>>  import java.util.*;
>> +import java.util.concurrent.TimeUnit;
>>
>>  public class AffinityCalculator {
>>    static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class);
>> @@ -24,6 +26,7 @@ public class AffinityCalculator {
>>    String fileName;
>>    Collection<DrillbitEndpoint> endpoints;
>>    HashMap<String,DrillbitEndpoint> endPointMap;
>> +  Stopwatch watch = new Stopwatch();
>>
>>    public AffinityCalculator(String fileName, FileSystem fs,
>> Collection<DrillbitEndpoint> endpoints) {
>>      this.fs = fs;
>> @@ -33,16 +36,20 @@ public class AffinityCalculator {
>>      buildEndpointMap();
>>    }
>>
>> +  /**
>> +   * Builds a mapping of block locations to file byte range
>> +   */
>>    private void buildBlockMap() {
>>      try {
>> +      watch.start();
>>        FileStatus file = fs.getFileStatus(new Path(fileName));
>> -      long tC = System.nanoTime();
>>        blocks = fs.getFileBlockLocations(file, 0 , file.getLen());
>> -      long tD = System.nanoTime();
>> +      watch.stop();
>>        logger.debug("Block locations: {}", blocks);
>> -      logger.debug("Took {} ms to get Block locations", (float)(tD - tC)
>> / 1e6);
>> +      logger.debug("Took {} ms to get Block locations",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>      } catch (IOException ioe) { throw new RuntimeException(ioe); }
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new
>> ImmutableRangeMap.Builder<Long,BlockLocation>();
>>      for (BlockLocation block : blocks) {
>>        long start = block.getOffset();
>> @@ -51,62 +58,72 @@ public class AffinityCalculator {
>>        blockMapBuilder = blockMapBuilder.put(range, block);
>>      }
>>      blockMap = blockMapBuilder.build();
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to build block map", (float)(tB - tA) /
>> 1e6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to build block map",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>>    /**
>> +   * For a given RowGroup, calculate how many bytes are available on
>> each on drillbit endpoint
>>     *
>> -   * @param entry
>> +   * @param rowGroup the RowGroup to calculate endpoint bytes for
>>     */
>> -  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) {
>> -    long tA = System.nanoTime();
>> +  public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) {
>> +    watch.reset();
>> +    watch.start();
>>      HashMap<String,Long> hostMap = new HashMap<>();
>> -    long start = entry.getStart();
>> -    long end = start + entry.getLength();
>> -    Range<Long> entryRange = Range.closedOpen(start, end);
>> -    ImmutableRangeMap<Long,BlockLocation> subRangeMap =
>> blockMap.subRangeMap(entryRange);
>> -    for (Map.Entry<Range<Long>,BlockLocation> e :
>> subRangeMap.asMapOfRanges().entrySet()) {
>> -      String[] hosts = null;
>> -      Range<Long> blockRange = e.getKey();
>> +    HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap();
>> +    long start = rowGroup.getStart();
>> +    long end = start + rowGroup.getLength();
>> +    Range<Long> rowGroupRange = Range.closedOpen(start, end);
>> +
>> +    // Find submap of ranges that intersect with the rowGroup
>> +    ImmutableRangeMap<Long,BlockLocation> subRangeMap =
>> blockMap.subRangeMap(rowGroupRange);
>> +
>> +    // Iterate through each block in this submap and get the host for
>> the block location
>> +    for (Map.Entry<Range<Long>,BlockLocation> block :
>> subRangeMap.asMapOfRanges().entrySet()) {
>> +      String[] hosts;
>> +      Range<Long> blockRange = block.getKey();
>>        try {
>> -        hosts = e.getValue().getHosts();
>> -      } catch (IOException ioe) { /*TODO Handle this exception */}
>> -      Range<Long> intersection = entryRange.intersection(blockRange);
>> +        hosts = block.getValue().getHosts();
>> +      } catch (IOException ioe) {
>> +        throw new RuntimeException("Failed to get hosts for block
>> location", ioe);
>> +      }
>> +      Range<Long> intersection = rowGroupRange.intersection(blockRange);
>>        long bytes = intersection.upperEndpoint() -
>> intersection.lowerEndpoint();
>> +
>> +      // For each host in the current block location, add the
>> intersecting bytes to the corresponding endpoint
>>        for (String host : hosts) {
>> -        if (hostMap.containsKey(host)) {
>> -          hostMap.put(host, hostMap.get(host) + bytes);
>> +        DrillbitEndpoint endpoint = getDrillBitEndpoint(host);
>> +        if (endpointByteMap.containsKey(endpoint)) {
>> +          endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) +
>> bytes);
>>          } else {
>> -          hostMap.put(host, bytes);
>> +          if (endpoint != null ) endpointByteMap.put(endpoint, bytes);
>>          }
>>        }
>>      }
>> -    HashMap<DrillbitEndpoint,Long> ebs = new HashMap();
>> -    try {
>> -      for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) {
>> -        String host = hostEntry.getKey();
>> -        Long bytes = hostEntry.getValue();
>> -        DrillbitEndpoint d = getDrillBitEndpoint(host);
>> -        if (d != null ) ebs.put(d, bytes);
>> -      }
>> -    } catch (NullPointerException n) {}
>> -    entry.setEndpointBytes(ebs);
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) /
>> 1e6);
>> +
>> +    rowGroup.setEndpointBytes(endpointByteMap);
>> +    rowGroup.setMaxBytes(endpointByteMap.size() > 0 ?
>> Collections.max(endpointByteMap.values()) : 0);
>> +    logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(),
>> rowGroup.getStart(), rowGroup.getMaxBytes());
>> +    watch.stop();
>> +    logger.debug("Took {} ms to set endpoint bytes",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>>
>>    private DrillbitEndpoint getDrillBitEndpoint(String hostName) {
>>      return endPointMap.get(hostName);
>>    }
>>
>> +  /**
>> +   * Builds a mapping of drillbit endpoints to hostnames
>> +   */
>>    private void buildEndpointMap() {
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      endPointMap = new HashMap<String, DrillbitEndpoint>();
>>      for (DrillbitEndpoint d : endpoints) {
>>        String hostName = d.getAddress();
>>        endPointMap.put(hostName, d);
>>      }
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) /
>> 1e6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to build endpoint map",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>>  }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> index 9e48d33..64ced87 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
>> @@ -18,14 +18,13 @@
>>  package org.apache.drill.exec.store.parquet;
>>
>>  import java.io.IOException;
>> -import java.util.ArrayList;
>> -import java.util.Collection;
>> -import java.util.Collections;
>> -import java.util.Comparator;
>> -import java.util.HashMap;
>> -import java.util.LinkedList;
>> -import java.util.List;
>> +import java.util.*;
>> +import java.util.concurrent.TimeUnit;
>>
>> +import com.google.common.base.Stopwatch;
>> +import com.google.common.collect.ArrayListMultimap;
>> +import com.google.common.collect.Lists;
>> +import com.google.common.collect.Multimap;
>>  import org.apache.drill.common.config.DrillConfig;
>>  import org.apache.drill.exec.exception.SetupException;
>>  import org.apache.drill.exec.physical.EndpointAffinity;
>> @@ -59,8 +58,9 @@ import com.google.common.base.Preconditions;
>>  public class ParquetGroupScan extends AbstractGroupScan {
>>    static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class);
>>
>> -  private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings;
>> +  private ArrayListMultimap<Integer,
>> ParquetRowGroupScan.RowGroupReadEntry> mappings;
>>    private List<RowGroupInfo> rowGroupInfos;
>> +  private Stopwatch watch = new Stopwatch();
>>
>>    public List<ReadEntryWithPath> getEntries() {
>>      return entries;
>> @@ -110,16 +110,14 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>    }
>>
>>    private void readFooter() throws IOException {
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      rowGroupInfos = new ArrayList();
>>      long start = 0, length = 0;
>>      ColumnChunkMetaData columnChunkMetaData;
>>      for (ReadEntryWithPath readEntryWithPath : entries){
>>        Path path = new Path(readEntryWithPath.getPath());
>>        ParquetMetadata footer =
>> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path);
>> -//      FileSystem fs =
>> FileSystem.get(this.storageEngine.getHadoopConfig());
>> -//      FileStatus status = fs.getFileStatus(path);
>> -//      ParquetMetadata footer =
>> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status);
>>        readEntryWithPath.getPath();
>>
>>        int i = 0;
>> @@ -138,38 +136,21 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>          i++;
>>        }
>>      }
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to get row group infos", (float)(tB - tA) /
>> 1E6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to get row group infos",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>>
>>    private void calculateEndpointBytes() {
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      AffinityCalculator ac = new AffinityCalculator(fileName, fs,
>> availableEndpoints);
>>      for (RowGroupInfo e : rowGroupInfos) {
>>        ac.setEndpointBytes(e);
>>        totalBytes += e.getLength();
>>      }
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB -
>> tA) / 1E6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to calculate EndpointBytes",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>    }
>> -/*
>> -  public LinkedList<RowGroupInfo> getRowGroups() {
>> -    return rowGroups;
>> -  }
>> -
>> -  public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) {
>> -    this.rowGroups = rowGroups;
>> -  }
>> -
>> -  public static class ParquetFileReadEntry {
>> -
>> -    String path;
>> -
>> -    public ParquetFileReadEntry(@JsonProperty String path){
>> -      this.path = path;
>> -    }
>> -  }
>> -  */
>>
>>    @JsonIgnore
>>    public FileSystem getFileSystem() {
>> @@ -232,16 +213,22 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>      }
>>    }
>>
>> +  /**
>> +   *Calculates the affinity each endpoint has for this scan, by adding
>> up the affinity each endpoint has for each
>> +   * rowGroup
>> +   * @return a list of EndpointAffinity objects
>> +   */
>>    @Override
>>    public List<EndpointAffinity> getOperatorAffinity() {
>> -    long tA = System.nanoTime();
>> +    watch.reset();
>> +    watch.start();
>>      if (this.endpointAffinities == null) {
>>        HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>();
>>        for (RowGroupInfo entry : rowGroupInfos) {
>>          for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) {
>>            long bytes = entry.getEndpointBytes().get(d);
>>            float affinity = (float)bytes / (float)totalBytes;
>> -          logger.error("RowGroup: {} Endpoint: {} Bytes: {}",
>> entry.getRowGroupIndex(), d.getAddress(), bytes);
>> +          logger.debug("RowGroup: {} Endpoint: {} Bytes: {}",
>> entry.getRowGroupIndex(), d.getAddress(), bytes);
>>            if (affinities.keySet().contains(d)) {
>>              affinities.put(d, affinities.get(d) + affinity);
>>            } else {
>> @@ -256,83 +243,90 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>        }
>>        this.endpointAffinities = affinityList;
>>      }
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to get operator affinity", (float)(tB - tA)
>> / 1E6);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to get operator affinity",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>      return this.endpointAffinities;
>>    }
>>
>>
>> +  static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01};
>>
>> -
>> +  /**
>> +   *
>> +   * @param incomingEndpoints
>> +   */
>>    @Override
>> -  public void applyAssignments(List<DrillbitEndpoint> endpoints) {
>> -    long tA = System.nanoTime();
>> -    Preconditions.checkArgument(endpoints.size() <=
>> rowGroupInfos.size());
>> -
>> -    int i = 0;
>> -    for (DrillbitEndpoint endpoint : endpoints) {
>> -      logger.debug("Endpoint index {}, endpoint host: {}", i++,
>> endpoint.getAddress());
>> +  public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints)
>> {
>> +    watch.reset();
>> +    watch.start();
>> +    Preconditions.checkArgument(incomingEndpoints.size() <=
>> rowGroupInfos.size());
>> +    mappings = ArrayListMultimap.create();
>> +    ArrayList rowGroupList = new ArrayList(rowGroupInfos);
>> +    List<DrillbitEndpoint> endpointLinkedlist =
>> Lists.newLinkedList(incomingEndpoints);
>> +    for(double cutoff : ASSIGNMENT_CUTOFFS ){
>> +      scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff,
>> false);
>>      }
>> -
>> -    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
>> -    mappings = new LinkedList[endpoints.size()];
>> -    LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints,
>> rowGroupInfos, 100, true, false);
>> -    LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints,
>> unassigned, 50, true, false);
>> -    LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints,
>> unassigned2, 25, true, false);
>> -    LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints,
>> unassigned3, 0, false, false);
>> -    LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints,
>> unassigned4, 0, false, true);
>> -    assert unassigned5.size() == 0 : String.format("All readEntries
>> should be assigned by now, but some are still unassigned");
>> -    long tB = System.nanoTime();
>> -    logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) /
>> 1E6);
>> +    scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true);
>> +    watch.stop();
>> +    logger.debug("Took {} ms to apply assignments",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>> +    Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries
>> should be assigned by now, but some are still unassigned");
>> +    Preconditions.checkArgument(!rowGroupInfos.isEmpty());
>>    }
>>
>> -  private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint>
>> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean
>> mustContain, boolean assignAll) {
>> -    Collections.sort(rowGroupInfos, new ParquetReadEntryComparator());
>> -    LinkedList<RowGroupInfo> unassigned = new LinkedList<>();
>> -
>> -    int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() *
>> 1.5);
>> +  public int fragmentPointer = 0;
>> +
>> +  /**
>> +   *
>> +   * @param endpointAssignments the mapping between fragment/endpoint
>> and rowGroup
>> +   * @param endpoints the list of drillbits, ordered by the
>> corresponding fragment
>> +   * @param rowGroups the list of rowGroups to assign
>> +   * @param requiredPercentage the percentage of max bytes required to
>> make an assignment
>> +   * @param assignAll if true, will assign even if no affinity
>> +   */
>> +  private void scanAndAssign (Multimap<Integer,
>> ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments,
>> List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double
>> requiredPercentage, boolean assignAll) {
>> +    Collections.sort(rowGroups, new ParquetReadEntryComparator());
>> +    final boolean requireAffinity = requiredPercentage > 0;
>> +    int maxAssignments = (int) (rowGroups.size() / endpoints.size());
>> +
>> +    if (maxAssignments < 1) maxAssignments = 1;
>> +
>> +    for(Iterator<RowGroupInfo> iter = rowGroups.iterator();
>> iter.hasNext();){
>> +      RowGroupInfo rowGroupInfo = iter.next();
>> +      for (int i = 0; i < endpoints.size(); i++) {
>> +        int minorFragmentId = (fragmentPointer + i) % endpoints.size();
>> +        DrillbitEndpoint currentEndpoint =
>> endpoints.get(minorFragmentId);
>> +        Map<DrillbitEndpoint, Long> bytesPerEndpoint =
>> rowGroupInfo.getEndpointBytes();
>> +        boolean haveAffinity =
>> bytesPerEndpoint.containsKey(currentEndpoint) ;
>>
>> -    if (maxEntries < 1) maxEntries = 1;
>> -
>> -    int i =0;
>> -    for(RowGroupInfo e : rowGroups) {
>> -      boolean assigned = false;
>> -      for (int j = i; j < i + endpoints.size(); j++) {
>> -        DrillbitEndpoint currentEndpoint =
>> endpoints.get(j%endpoints.size());
>>          if (assignAll ||
>> -                (e.getEndpointBytes().size() > 0 &&
>> -                (e.getEndpointBytes().containsKey(currentEndpoint) ||
>> !mustContain) &&
>> -                (mappings[j%endpoints.size()] == null ||
>> mappings[j%endpoints.size()].size() < maxEntries) &&
>> -                e.getEndpointBytes().get(currentEndpoint) >=
>> e.getMaxBytes() * requiredPercentage / 100)) {
>> -          LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries =
>> mappings[j%endpoints.size()];
>> -          if(entries == null){
>> -            entries = new
>> LinkedList<ParquetRowGroupScan.RowGroupReadEntry>();
>> -            mappings[j%endpoints.size()] = entries;
>> -          }
>> -          entries.add(e.getRowGroupReadEntry());
>> -          logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}",
>> e.getPath(), e.getStart(), currentEndpoint.getAddress());
>> -          assigned = true;
>> +                (!bytesPerEndpoint.isEmpty() &&
>> +                        (!requireAffinity || haveAffinity) &&
>> +
>>  (!endpointAssignments.containsKey(minorFragmentId) ||
>> endpointAssignments.get(minorFragmentId).size() < maxAssignments) &&
>> +                        bytesPerEndpoint.get(currentEndpoint) >=
>> rowGroupInfo.getMaxBytes() * requiredPercentage)) {
>> +
>> +          endpointAssignments.put(minorFragmentId,
>> rowGroupInfo.getRowGroupReadEntry());
>> +          logger.debug("Assigned rowGroup {} to minorFragmentId {}
>> endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId,
>> endpoints.get(minorFragmentId).getAddress());
>> +          iter.remove();
>> +          fragmentPointer = (minorFragmentId + 1) % endpoints.size();
>>            break;
>>          }
>>        }
>> -      if (!assigned) unassigned.add(e);
>> -      i++;
>> +
>>      }
>> -    return unassigned;
>>    }
>>
>>    @Override
>>    public ParquetRowGroupScan getSpecificScan(int minorFragmentId) {
>> -    assert minorFragmentId < mappings.length : String.format("Mappings
>> length [%d] should be longer than minor fragment id [%d] but it isn't.",
>> mappings.length, minorFragmentId);
>> -    for (ParquetRowGroupScan.RowGroupReadEntry rg :
>> mappings[minorFragmentId]) {
>> +    assert minorFragmentId < mappings.size() : String.format("Mappings
>> length [%d] should be longer than minor fragment id [%d] but it isn't.",
>> mappings.size(), minorFragmentId);
>> +    for (ParquetRowGroupScan.RowGroupReadEntry rg :
>> mappings.get(minorFragmentId)) {
>>        logger.debug("minorFragmentId: {} Path: {} RowGroupIndex:
>> {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex());
>>      }
>> +
>>  Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(),
>> String.format("MinorFragmentId %d has no read entries assigned",
>> minorFragmentId));
>>      try {
>> -      return new ParquetRowGroupScan(storageEngine, engineConfig,
>> mappings[minorFragmentId]);
>> +      return new ParquetRowGroupScan(storageEngine, engineConfig,
>> mappings.get(minorFragmentId));
>>      } catch (SetupException e) {
>> -      e.printStackTrace(); // TODO - fix this
>> +      throw new RuntimeException("Error setting up ParquetRowGroupScan",
>> e);
>>      }
>> -    return null;
>>    }
>>
>>    @Override
>> @@ -342,7 +336,8 @@ public class ParquetGroupScan extends
>> AbstractGroupScan {
>>
>>    @Override
>>    public OperatorCost getCost() {
>> -    return new OperatorCost(1,1,1,1);
>> +    //TODO Figure out how to properly calculate cost
>> +    return new OperatorCost(1,rowGroupInfos.size(),1,1);
>>    }
>>
>>    @Override
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> index 4e46034..3aaa987 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
>> @@ -211,8 +211,8 @@ public class ParquetRecordReader implements
>> RecordReader {
>>        }
>>        for (VarLenBinaryReader.VarLengthColumn r :
>> varLengthReader.columns) {
>>          output.addField(r.valueVecHolder.getValueVector());
>> -        output.setNewSchema();
>>        }
>> +      output.setNewSchema();
>>      }catch(SchemaChangeException e) {
>>        throw new ExecutionSetupException("Error setting up output
>> mutator.", e);
>>      }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> index 03fb4ec..addd288 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
>> @@ -21,7 +21,9 @@ import java.io.IOException;
>>  import java.text.SimpleDateFormat;
>>  import java.util.Date;
>>  import java.util.List;
>> +import java.util.concurrent.TimeUnit;
>>
>> +import com.google.common.base.Stopwatch;
>>  import org.apache.drill.common.exceptions.ExecutionSetupException;
>>  import org.apache.drill.exec.ops.FragmentContext;
>>  import org.apache.drill.exec.physical.impl.BatchCreator;
>> @@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader;
>>  import parquet.hadoop.metadata.ParquetMetadata;
>>
>>  public class ParquetScanBatchCreator implements
>> BatchCreator<ParquetRowGroupScan>{
>> -  static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
>> +  static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
>>
>>    @Override
>>    public RecordBatch getBatch(FragmentContext context,
>> ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws
>> ExecutionSetupException {
>> -    long tA = System.nanoTime(), tB;
>> -    System.out.println( new SimpleDateFormat("mm:ss S").format(new
>> Date()) + " :Start of ScanBatCreator.scanBatch");
>> +    Stopwatch watch = new Stopwatch();
>> +    watch.start();
>>      Preconditions.checkArgument(children.isEmpty());
>>      List<RecordReader> readers = Lists.newArrayList();
>>      for(ParquetRowGroupScan.RowGroupReadEntry e :
>> rowGroupScan.getRowGroupReadEntries()){
>> @@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements
>> BatchCreator<ParquetRowGroupScan
>>          throw new ExecutionSetupException(e1);
>>        }
>>      }
>> -    System.out.println( "Total time in method: " + ((float)
>> (System.nanoTime() - tA) / 1e9));
>> +    logger.debug("total time in ScanBatchCreator.getBatch: {} ms",
>> watch.elapsed(TimeUnit.MILLISECONDS));
>>      return new ScanBatch(context, readers.iterator());
>>    }
>>  }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> index 9a33109..72c5f34 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java
>> @@ -35,8 +35,8 @@ public class ErrorHelper {
>>      if(message != null){
>>        sb.append(message);
>>      }
>> -
>> -    do{
>> +
>> +    while (true) {
>>        sb.append(" < ");
>>        sb.append(t.getClass().getSimpleName());
>>        if(t.getMessage() != null){
>> @@ -44,7 +44,9 @@ public class ErrorHelper {
>>          sb.append(t.getMessage());
>>          sb.append(" ]");
>>        }
>> -    }while(t.getCause() != null && t.getCause() != t);
>> +      if (t.getCause() == null || t.getCause() == t) break;
>> +      t = t.getCause();
>> +    }
>>
>>      builder.setMessage(sb.toString());
>>
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> index e2a00f1..18ac294 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java
>> @@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan;
>>  import org.apache.drill.exec.planner.PhysicalPlanReader;
>>  import org.apache.drill.exec.proto.CoordinationProtos;
>>  import org.apache.drill.exec.proto.UserProtos;
>> +import org.apache.drill.exec.record.RecordBatchLoader;
>> +import org.apache.drill.exec.record.VectorWrapper;
>> +import org.apache.drill.exec.rpc.RpcException;
>>  import org.apache.drill.exec.rpc.user.QueryResultBatch;
>> +import org.apache.drill.exec.rpc.user.UserResultsListener;
>> +import org.apache.drill.exec.server.BootStrapContext;
>>  import org.apache.drill.exec.server.Drillbit;
>>  import org.apache.drill.exec.server.RemoteServiceSet;
>>  import org.apache.drill.exec.store.parquet.ParquetGroupScan;
>> +import org.apache.drill.exec.vector.ValueVector;
>>  import org.apache.hadoop.fs.BlockLocation;
>>  import org.apache.hadoop.fs.FileStatus;
>>  import org.apache.hadoop.fs.FileSystem;
>> @@ -29,6 +35,7 @@ import java.io.IOException;
>>  import java.nio.charset.Charset;
>>  import java.util.LinkedList;
>>  import java.util.List;
>> +import java.util.concurrent.CountDownLatch;
>>
>>  import static junit.framework.Assert.assertNull;
>>  import static org.junit.Assert.assertEquals;
>> @@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan {
>>
>>    //public String fileName = "/physical_test2.json";
>>    public String fileName = "parquet_scan_union_screen_physical.json";
>> +//  public String fileName = "parquet-sample.json";
>> +
>>
>>    @Test
>>    @Ignore
>> @@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan {
>>        bit1.run();
>>        client.connect();
>>        List<QueryResultBatch> results =
>> client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8));
>> -      System.out.println(String.format("Got %d results",
>> results.size()));
>> +      RecordBatchLoader loader = new
>> RecordBatchLoader(bit1.getContext().getAllocator());
>> +      for (QueryResultBatch b : results) {
>> +        System.out.println(String.format("Got %d results",
>> b.getHeader().getRowCount()));
>> +        loader.load(b.getHeader().getDef(), b.getData());
>> +        for (VectorWrapper vw : loader) {
>> +          System.out.println(vw.getValueVector().getField().getName());
>> +          ValueVector vv = vw.getValueVector();
>> +          for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
>> +            Object o = vv.getAccessor().getObject(i);
>> +            System.out.println(vv.getAccessor().getObject(i));
>> +          }
>> +        }
>> +      }
>> +      client.close();
>> +    }
>> +  }
>> +
>> +  private class ParquetResultsListener implements UserResultsListener {
>> +    private CountDownLatch latch = new CountDownLatch(1);
>> +    @Override
>> +    public void submissionFailed(RpcException ex) {
>> +      logger.error("submission failed", ex);
>> +      latch.countDown();
>> +    }
>> +
>> +    @Override
>> +    public void resultArrived(QueryResultBatch result) {
>> +      System.out.printf("Result batch arrived. Number of records: %d",
>> result.getHeader().getRowCount());
>> +      if (result.getHeader().getIsLastChunk()) latch.countDown();
>> +    }
>> +
>> +    public void await() throws Exception {
>> +      latch.await();
>> +    }
>> +  }
>> +  @Test
>> +  @Ignore
>> +  public void testParseParquetPhysicalPlanRemote() throws Exception {
>> +    DrillConfig config = DrillConfig.create();
>> +
>> +    try(DrillClient client = new DrillClient(config);){
>> +      client.connect();
>> +      ParquetResultsListener listener = new ParquetResultsListener();
>> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8),
>> listener);
>> +      listener.await();
>>        client.close();
>>      }
>>    }
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> index 1d91455..7a99c3f 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> +++
>> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
>> @@ -48,6 +48,7 @@ import
>> org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo;
>>  import org.apache.drill.exec.vector.BaseDataValueVector;
>>  import org.apache.drill.exec.vector.ValueVector;
>>  import org.junit.BeforeClass;
>> +import org.junit.Ignore;
>>  import org.junit.Test;
>>
>>  import parquet.bytes.BytesInput;
>> @@ -68,6 +69,7 @@ public class ParquetRecordReaderTest {
>>    static final org.slf4j.Logger logger =
>> org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class);
>>
>>    private boolean VERBOSE_DEBUG = false;
>> +  private boolean checkValues = true;
>>
>>    static final int numberRowGroups = 20;
>>    static final int recordsPerRowGroup = 300000;
>> @@ -100,6 +102,20 @@ public class ParquetRecordReaderTest {
>>      testParquetFullEngineLocalText(planText, fileName, i,
>> numberRowGroups, recordsPerRowGroup);
>>    }
>>
>> +  @Test
>> +  @Ignore
>> +  public void testLocalDistributed() throws Exception {
>> +    String planName = "/parquet/parquet_scan_union_screen_physical.json";
>> +    testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20,
>> 300000);
>> +  }
>> +
>> +  @Test
>> +  @Ignore
>> +  public void testRemoteDistributed() throws Exception {
>> +    String planName = "/parquet/parquet_scan_union_screen_physical.json";
>> +    testParquetFullEngineRemote(planName, fileName, 1, 10, 30000);
>> +  }
>> +
>>
>>    private class ParquetResultListener implements UserResultsListener {
>>      private SettableFuture<Void> future = SettableFuture.create();
>> @@ -155,8 +171,12 @@ public class ParquetRecordReaderTest {
>>            if (VERBOSE_DEBUG){
>>              System.out.print(vv.getAccessor().getObject(j) + ", " + (j %
>> 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : ""));
>>            }
>> -          assertField(vv, j, (TypeProtos.MinorType) currentField.type,
>> -              currentField.values[(int) (columnValCounter % 3)],
>> (String) currentField.name + "/");
>> +          if (checkValues) {
>> +            try {
>> +              assertField(vv, j, (TypeProtos.MinorType)
>> currentField.type,
>> +                currentField.values[(int) (columnValCounter % 3)],
>> (String) currentField.name + "/");
>> +            } catch (AssertionError e) { submissionFailed(new
>> RpcException(e)); }
>> +          }
>>            columnValCounter++;
>>          }
>>          if (VERBOSE_DEBUG){
>> @@ -197,7 +217,9 @@ public class ParquetRecordReaderTest {
>>        batchCounter++;
>>        if(result.getHeader().getIsLastChunk()){
>>          for (String s : valuesChecked.keySet()) {
>> +          try {
>>            assertEquals("Record count incorrect for column: " + s,
>> totalRecords, (long) valuesChecked.get(s));
>> +          } catch (AssertionError e) { submissionFailed(new
>> RpcException(e)); }
>>          }
>>
>>          assert valuesChecked.keySet().size() > 0;
>> @@ -222,11 +244,13 @@ public class ParquetRecordReaderTest {
>>
>>      DrillConfig config = DrillConfig.create();
>>
>> +    checkValues = false;
>> +
>>      try(DrillClient client = new DrillClient(config);){
>>        client.connect();
>>        RecordBatchLoader batchLoader = new
>> RecordBatchLoader(client.getAllocator());
>>        ParquetResultListener resultListener = new
>> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
>> numberOfTimesRead);
>> -      client.runQuery(UserProtos.QueryType.LOGICAL,
>> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
>> resultListener);
>> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8),
>> resultListener);
>>        resultListener.get();
>>      }
>>
>> @@ -259,6 +283,28 @@ public class ParquetRecordReaderTest {
>>    }
>>
>>
>> +  //use this method to submit physical plan
>> +  public void testParquetFullEngineLocalTextDistributed(String planName,
>> String filename, int numberOfTimesRead /* specified in json plan */, int
>> numberOfRowGroups, int recordsPerRowGroup) throws Exception{
>> +
>> +    RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
>> +
>> +    checkValues = false;
>> +
>> +    DrillConfig config = DrillConfig.create();
>> +
>> +    try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient
>> client = new DrillClient(config, serviceSet.getCoordinator());){
>> +      bit1.run();
>> +      client.connect();
>> +      RecordBatchLoader batchLoader = new
>> RecordBatchLoader(client.getAllocator());
>> +      ParquetResultListener resultListener = new
>> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups,
>> numberOfTimesRead);
>> +      Stopwatch watch = new Stopwatch().start();
>> +      client.runQuery(UserProtos.QueryType.PHYSICAL,
>> Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8),
>> resultListener);
>> +      resultListener.get();
>> +      System.out.println(String.format("Took %d ms to run query",
>> watch.elapsed(TimeUnit.MILLISECONDS)));
>> +
>> +    }
>> +
>> +  }
>>
>>    public String pad(String value, int length) {
>>      return pad(value, length, " ");
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> ----------------------------------------------------------------------
>> diff --git
>> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> index f508d09..5efecaf 100644
>> ---
>> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> +++
>> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json
>> @@ -11,10 +11,7 @@
>>      @id : 1,
>>      entries : [
>>      {
>> -        path : "/tmp/testParquetFile_many_types_3"
>> -    },
>> -    {
>> -        path : "/tmp/testParquetFile_many_types_3"
>> +        path : "/tmp/parquet_test_file_many_types"
>>      }
>>      ],
>>      storageengine:{
>>
>>
>
>
> --
> Regards,
> Tanujit
>



-- 
Regards,
Tanujit

Reply via email to