A more detailed output, i just ran the purticular test this time [tanujit@legolas java-exec]$ mvn -Dtest=ParquetRecordReaderTest test [INFO] Scanning for projects... [INFO]
[INFO] ------------------------------------------------------------------------ [INFO] Building java-exec 1.0-SNAPSHOT [INFO] ------------------------------------------------------------------------ [INFO] [INFO] --- maven-resources-plugin:2.6:copy-resources (copy-resources) @ java-exec --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 369 resources [INFO] [INFO] --- maven-enforcer-plugin:1.2:enforce (no_commons_logging) @ java-exec --- [INFO] [INFO] --- maven-antrun-plugin:1.6:run (generate-sources) @ java-exec --- [WARNING] Parameter tasks is deprecated, use target instead [INFO] Executing tasks main: [INFO] Executed tasks [INFO] Registering compile source root /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/generated-sources [INFO] [INFO] --- fmpp-maven-plugin:1.0:generate (generate-sources) @ java-exec --- - Executing: ValueHolders.java log4j:WARN No appenders could be found for logger (freemarker.cache). log4j:WARN Please initialize the log4j system properly. - Executing: VariableLengthVectors.java - Executing: FixedValueVectors.java - Executing: TypeHelper.java - Executing: NullableValueVectors.java - Executing: RepeatedValueVectors.java [INFO] Done [INFO] [INFO] --- maven-remote-resources-plugin:1.1:process (default) @ java-exec --- [INFO] Setting property: classpath.resource.loader.class => 'org.codehaus.plexus.velocity.ContextClassLoaderResourceLoader'. [INFO] Setting property: velocimacro.messages.on => 'false'. [INFO] Setting property: resource.loader => 'classpath'. [INFO] Setting property: resource.manager.logwhenfound => 'false'. [INFO] [INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ java-exec --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 1 resource [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.0:compile (default-compile) @ java-exec --- [INFO] Changes detected - recompiling the module! [INFO] Compiling 459 source files to /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/classes [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetStorageEngine.java:[20,21] com.sun.corba.se.impl.interceptors.CodecFactoryImpl is internal proprietary API and may be removed in a future release [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java: Some input files use or override a deprecated API. [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/bit/BitConnection.java: Recompile with -Xlint:deprecation for details. [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java: Some input files use unchecked or unsafe operations. [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java: Recompile with -Xlint:unchecked for details. [INFO] [INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ java-exec --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 36 resources [INFO] Copying 3 resources [INFO] [INFO] --- maven-compiler-plugin:3.0:testCompile (default-testCompile) @ java-exec --- [INFO] Changes detected - recompiling the module! [INFO] Compiling 126 source files to /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/test-classes [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[25,16] sun.misc.Unsafe is internal proprietary API and may be removed in a future release [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[53,17] sun.misc.Unsafe is internal proprietary API and may be removed in a future release [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[94,12] sun.misc.Unsafe is internal proprietary API and may be removed in a future release [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[95,14] sun.misc.Unsafe is internal proprietary API and may be removed in a future release [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[98,26] sun.misc.Unsafe is internal proprietary API and may be removed in a future release [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestExecutionAbstractions.java:[100,27] sun.misc.Unsafe is internal proprietary API and may be removed in a future release [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java: Some input files use or override a deprecated API. [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java: Recompile with -Xlint:deprecation for details. [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java: Some input files use unchecked or unsafe operations. [WARNING] /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestFileGenerator.java: Recompile with -Xlint:unchecked for details. [INFO] [INFO] --- maven-surefire-plugin:2.15:test (default-test) @ java-exec --- [INFO] Surefire report directory: /backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/surefire-reports ------------------------------------------------------- T E S T S ------------------------------------------------------- Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest 08:18:15.598 [main] DEBUG org.reflections.Reflections - going to scan these urls: jar:file:/home/tanujit/.m2/repository/org/apache/drill/common/1.0-SNAPSHOT/common-1.0-SNAPSHOT.jar!/ file:/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/test-classes/ file:/backup/java/wip/incubator-drill/sandbox/prototype/exec/java-exec/target/classes/ jar:file:/home/tanujit/.m2/repository/org/apache/drill/common/1.0-SNAPSHOT/common-1.0-SNAPSHOT-tests.jar!/ 08:18:16.124 [main] INFO org.reflections.Reflections - Reflections took 524 ms to scan 4 urls, producing 667 keys and 1620 values 08:18:16.148 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework 08:18:16.150 [main] DEBUG i.n.c.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 8 08:18:16.165 [main] DEBUG i.n.util.internal.PlatformDependent - UID: 1000 08:18:16.165 [main] DEBUG i.n.util.internal.PlatformDependent - Java version: 7 08:18:16.166 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noUnsafe: false 08:18:16.167 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.ByteBuffer.cleaner: available 08:18:16.167 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Buffer.address: available 08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available 08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available 08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: true 08:18:16.168 [main] DEBUG i.n.util.internal.PlatformDependent - sun.misc.Unsafe: available 08:18:16.169 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noJavassist: false 08:18:16.227 [main] DEBUG i.n.util.internal.PlatformDependent - Javassist: available 08:18:16.228 [main] DEBUG i.n.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false 08:18:16.249 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.noKeySetOptimization: false 08:18:16.249 [main] DEBUG io.netty.channel.nio.NioEventLoop - -Dio.netty.selectorAutoRebuildThreshold: 512 08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL - -Dio.netty.allocator.numHeapArenas: 4 08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL - -Dio.netty.allocator.numDirectArenas: 4 08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL - -Dio.netty.allocator.pageSize: 8192 08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL - -Dio.netty.allocator.maxOrder: 11 08:18:16.265 [main] DEBUG i.n.buffer.PooledByteBufAllocatorL - -Dio.netty.allocator.chunkSize: 16777216 08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback interface: lo 08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback address: /0:0:0:0:0:0:0:1%1 (primary) 08:18:16.335 [main] DEBUG io.netty.util.NetUtil - Loopback address: / 127.0.0.1 08:18:16.335 [main] DEBUG io.netty.util.NetUtil - /proc/sys/net/core/somaxconn: 128 08:18:16.344 [UserServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x30e3b962] REGISTERED 08:18:16.347 [UserServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x30e3b962] BIND(0.0.0.0/0.0.0.0:31010) 08:18:16.350 [UserServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x30e3b962, /0:0:0:0:0:0:0:0:31010] ACTIVE 08:18:16.366 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b] REGISTERED 08:18:16.367 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b] BIND(0.0.0.0/0.0.0.0:31011) 08:18:16.367 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] ACTIVE 08:18:16.571 [Client-1] DEBUG i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated: io.netty.util.internal.__matchers__.io.netty.buffer.ByteBufMatcher 08:18:16.574 [Client-1] DEBUG i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated: io.netty.util.internal.__matchers__.org.apache.drill.exec.rpc.OutboundRpcMessageMatcher 08:18:16.576 [Client-1] DEBUG i.n.u.i.JavassistTypeParameterMatcherGenerator - Generated: io.netty.util.internal.__matchers__.org.apache.drill.exec.rpc.InboundRpcMessageMatcher 08:18:16.579 [UserServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x30e3b962, /0:0:0:0:0:0:0:0:31010] RECEIVED: [id: 0xf33275f9, / 192.168.0.102:34995 => /192.168.0.102:31010] 08:18:16.589 [Client-1] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.noResourceLeakDetection: false 08:18:16.770 [WorkManager-1] DEBUG org.apache.hadoop.security.Groups - Creating new Groups object 08:18:16.817 [WorkManager-1] DEBUG org.apache.hadoop.security.Groups - Group mapping impl=org.apache.hadoop.security.ShellBasedUnixGroupsMapping; cacheTimeout=300000 08:18:16.882 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation - hadoop login 08:18:16.882 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation - hadoop login commit 08:18:16.898 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation - using local user:UnixPrincipal: tanujit 08:18:16.900 [WorkManager-1] DEBUG o.a.h.security.UserGroupInformation - UGI loginUser:tanujit 08:18:16.940 [WorkManager-1] DEBUG org.apache.hadoop.fs.FileSystem - Creating filesystem for file:/// SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 08:18:17.922 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x1f148baa, / 192.168.0.102:45778 => /192.168.0.102:31011] 08:18:17.923 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xc4ed7542, / 192.168.0.102:45779 => /192.168.0.102:31011] 08:18:17.937 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x70a4a44c, / 192.168.0.102:45780 => /192.168.0.102:31011] 08:18:17.937 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xf6360725, / 192.168.0.102:45781 => /192.168.0.102:31011] 08:18:17.937 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x51467583, / 192.168.0.102:45782 => /192.168.0.102:31011] 08:18:17.938 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xaf6d72df, / 192.168.0.102:45783 => /192.168.0.102:31011] 08:18:17.938 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x6a8a1670, / 192.168.0.102:45784 => /192.168.0.102:31011] 08:18:17.938 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xf18205a3, / 192.168.0.102:45785 => /192.168.0.102:31011] 08:18:17.939 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0xa09abc2c, / 192.168.0.102:45786 => /192.168.0.102:31011] 08:18:17.943 [BitServer-1] INFO i.n.handler.logging.LoggingHandler - [id: 0x8ebafd5b, /0:0:0:0:0:0:0:0:31011] RECEIVED: [id: 0x3784f526, / 192.168.0.102:45787 => /192.168.0.102:31011] On Sat, Aug 24, 2013 at 8:15 AM, Tanujit Ghosh <[email protected]>wrote: > Hi, > > when i try mvn install after these changes the > org.apache.drill.exec.store.parquet.ParquetRecordReaderTest is hanging. > > Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.596 sec > - in org.apache.drill.exec.expr.ExpressionTest > Running org.apache.drill.exec.store.TestAffinityCalculator > Took 0.616287 ms to build range map > Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.137 sec > - in org.apache.drill.exec.store.TestAffinityCalculator > Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > SLF4J: Defaulting to no-operation (NOP) logger implementation > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further > details. > > Environment is Fedora 18, open jdk 1.7 > > with skipTests everything is getting compiled fine. > > Regards > Tanujit > > > > On Fri, Aug 23, 2013 at 5:36 AM, <[email protected]> wrote: > >> DRILL-176: Updates to affinity calculator, fixes for parquet >> serialization. Fix to ErrorHelper looping >> >> >> Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo >> Commit: >> http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617 >> Tree: >> http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617 >> Diff: >> http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617 >> >> Branch: refs/heads/master >> Commit: 7edd36170a9be291a69e44f6090474193485bf14 >> Parents: d6ae53e >> Author: Steven Phillips <[email protected]> >> Authored: Thu Aug 22 16:18:55 2013 -0700 >> Committer: Jacques Nadeau <[email protected]> >> Committed: Thu Aug 22 16:18:55 2013 -0700 >> >> ---------------------------------------------------------------------- >> .../drill/exec/planner/fragment/Wrapper.java | 5 +- >> .../drill/exec/store/AffinityCalculator.java | 91 ++++++---- >> .../exec/store/parquet/ParquetGroupScan.java | 177 +++++++++---------- >> .../exec/store/parquet/ParquetRecordReader.java | 2 +- >> .../store/parquet/ParquetScanBatchCreator.java | 10 +- >> .../drill/exec/work/foreman/ErrorHelper.java | 8 +- >> .../exec/store/TestParquetPhysicalPlan.java | 55 +++++- >> .../store/parquet/ParquetRecordReaderTest.java | 52 +++++- >> .../parquet_scan_union_screen_physical.json | 5 +- >> 9 files changed, 257 insertions(+), 148 deletions(-) >> ---------------------------------------------------------------------- >> >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java >> ---------------------------------------------------------------------- >> diff --git >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java >> index d5a24b0..8c4b0b4 100644 >> --- >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java >> +++ >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java >> @@ -151,15 +151,12 @@ public class Wrapper { >> for (int i = start; i < start + width; i++) { >> endpoints.add(all.get(i % div)); >> } >> - } else if (values.size() < width) { >> - throw new NotImplementedException( >> - "Haven't implemented a scenario where we have some node >> affinity but the affinity list is smaller than the expected width."); >> } else { >> // get nodes with highest affinity. >> Collections.sort(values); >> values = Lists.reverse(values); >> for (int i = 0; i < width; i++) { >> - endpoints.add(values.get(i).getEndpoint()); >> + endpoints.add(values.get(i%values.size()).getEndpoint()); >> } >> } >> >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java >> ---------------------------------------------------------------------- >> diff --git >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java >> index b4092cc..b341ea4 100644 >> --- >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java >> +++ >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java >> @@ -1,6 +1,7 @@ >> package org.apache.drill.exec.store; >> >> >> +import com.google.common.base.Stopwatch; >> import com.google.common.collect.ImmutableRangeMap; >> import com.google.common.collect.Range; >> import org.apache.drill.exec.store.parquet.ParquetGroupScan; >> @@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path; >> >> import java.io.IOException; >> import java.util.*; >> +import java.util.concurrent.TimeUnit; >> >> public class AffinityCalculator { >> static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class); >> @@ -24,6 +26,7 @@ public class AffinityCalculator { >> String fileName; >> Collection<DrillbitEndpoint> endpoints; >> HashMap<String,DrillbitEndpoint> endPointMap; >> + Stopwatch watch = new Stopwatch(); >> >> public AffinityCalculator(String fileName, FileSystem fs, >> Collection<DrillbitEndpoint> endpoints) { >> this.fs = fs; >> @@ -33,16 +36,20 @@ public class AffinityCalculator { >> buildEndpointMap(); >> } >> >> + /** >> + * Builds a mapping of block locations to file byte range >> + */ >> private void buildBlockMap() { >> try { >> + watch.start(); >> FileStatus file = fs.getFileStatus(new Path(fileName)); >> - long tC = System.nanoTime(); >> blocks = fs.getFileBlockLocations(file, 0 , file.getLen()); >> - long tD = System.nanoTime(); >> + watch.stop(); >> logger.debug("Block locations: {}", blocks); >> - logger.debug("Took {} ms to get Block locations", (float)(tD - tC) >> / 1e6); >> + logger.debug("Took {} ms to get Block locations", >> watch.elapsed(TimeUnit.MILLISECONDS)); >> } catch (IOException ioe) { throw new RuntimeException(ioe); } >> - long tA = System.nanoTime(); >> + watch.reset(); >> + watch.start(); >> ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new >> ImmutableRangeMap.Builder<Long,BlockLocation>(); >> for (BlockLocation block : blocks) { >> long start = block.getOffset(); >> @@ -51,62 +58,72 @@ public class AffinityCalculator { >> blockMapBuilder = blockMapBuilder.put(range, block); >> } >> blockMap = blockMapBuilder.build(); >> - long tB = System.nanoTime(); >> - logger.debug("Took {} ms to build block map", (float)(tB - tA) / >> 1e6); >> + watch.stop(); >> + logger.debug("Took {} ms to build block map", >> watch.elapsed(TimeUnit.MILLISECONDS)); >> } >> /** >> + * For a given RowGroup, calculate how many bytes are available on >> each on drillbit endpoint >> * >> - * @param entry >> + * @param rowGroup the RowGroup to calculate endpoint bytes for >> */ >> - public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) { >> - long tA = System.nanoTime(); >> + public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) { >> + watch.reset(); >> + watch.start(); >> HashMap<String,Long> hostMap = new HashMap<>(); >> - long start = entry.getStart(); >> - long end = start + entry.getLength(); >> - Range<Long> entryRange = Range.closedOpen(start, end); >> - ImmutableRangeMap<Long,BlockLocation> subRangeMap = >> blockMap.subRangeMap(entryRange); >> - for (Map.Entry<Range<Long>,BlockLocation> e : >> subRangeMap.asMapOfRanges().entrySet()) { >> - String[] hosts = null; >> - Range<Long> blockRange = e.getKey(); >> + HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap(); >> + long start = rowGroup.getStart(); >> + long end = start + rowGroup.getLength(); >> + Range<Long> rowGroupRange = Range.closedOpen(start, end); >> + >> + // Find submap of ranges that intersect with the rowGroup >> + ImmutableRangeMap<Long,BlockLocation> subRangeMap = >> blockMap.subRangeMap(rowGroupRange); >> + >> + // Iterate through each block in this submap and get the host for >> the block location >> + for (Map.Entry<Range<Long>,BlockLocation> block : >> subRangeMap.asMapOfRanges().entrySet()) { >> + String[] hosts; >> + Range<Long> blockRange = block.getKey(); >> try { >> - hosts = e.getValue().getHosts(); >> - } catch (IOException ioe) { /*TODO Handle this exception */} >> - Range<Long> intersection = entryRange.intersection(blockRange); >> + hosts = block.getValue().getHosts(); >> + } catch (IOException ioe) { >> + throw new RuntimeException("Failed to get hosts for block >> location", ioe); >> + } >> + Range<Long> intersection = rowGroupRange.intersection(blockRange); >> long bytes = intersection.upperEndpoint() - >> intersection.lowerEndpoint(); >> + >> + // For each host in the current block location, add the >> intersecting bytes to the corresponding endpoint >> for (String host : hosts) { >> - if (hostMap.containsKey(host)) { >> - hostMap.put(host, hostMap.get(host) + bytes); >> + DrillbitEndpoint endpoint = getDrillBitEndpoint(host); >> + if (endpointByteMap.containsKey(endpoint)) { >> + endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) + >> bytes); >> } else { >> - hostMap.put(host, bytes); >> + if (endpoint != null ) endpointByteMap.put(endpoint, bytes); >> } >> } >> } >> - HashMap<DrillbitEndpoint,Long> ebs = new HashMap(); >> - try { >> - for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) { >> - String host = hostEntry.getKey(); >> - Long bytes = hostEntry.getValue(); >> - DrillbitEndpoint d = getDrillBitEndpoint(host); >> - if (d != null ) ebs.put(d, bytes); >> - } >> - } catch (NullPointerException n) {} >> - entry.setEndpointBytes(ebs); >> - long tB = System.nanoTime(); >> - logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) / >> 1e6); >> + >> + rowGroup.setEndpointBytes(endpointByteMap); >> + rowGroup.setMaxBytes(endpointByteMap.size() > 0 ? >> Collections.max(endpointByteMap.values()) : 0); >> + logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(), >> rowGroup.getStart(), rowGroup.getMaxBytes()); >> + watch.stop(); >> + logger.debug("Took {} ms to set endpoint bytes", >> watch.elapsed(TimeUnit.MILLISECONDS)); >> } >> >> private DrillbitEndpoint getDrillBitEndpoint(String hostName) { >> return endPointMap.get(hostName); >> } >> >> + /** >> + * Builds a mapping of drillbit endpoints to hostnames >> + */ >> private void buildEndpointMap() { >> - long tA = System.nanoTime(); >> + watch.reset(); >> + watch.start(); >> endPointMap = new HashMap<String, DrillbitEndpoint>(); >> for (DrillbitEndpoint d : endpoints) { >> String hostName = d.getAddress(); >> endPointMap.put(hostName, d); >> } >> - long tB = System.nanoTime(); >> - logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) / >> 1e6); >> + watch.stop(); >> + logger.debug("Took {} ms to build endpoint map", >> watch.elapsed(TimeUnit.MILLISECONDS)); >> } >> } >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java >> ---------------------------------------------------------------------- >> diff --git >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java >> index 9e48d33..64ced87 100644 >> --- >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java >> +++ >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java >> @@ -18,14 +18,13 @@ >> package org.apache.drill.exec.store.parquet; >> >> import java.io.IOException; >> -import java.util.ArrayList; >> -import java.util.Collection; >> -import java.util.Collections; >> -import java.util.Comparator; >> -import java.util.HashMap; >> -import java.util.LinkedList; >> -import java.util.List; >> +import java.util.*; >> +import java.util.concurrent.TimeUnit; >> >> +import com.google.common.base.Stopwatch; >> +import com.google.common.collect.ArrayListMultimap; >> +import com.google.common.collect.Lists; >> +import com.google.common.collect.Multimap; >> import org.apache.drill.common.config.DrillConfig; >> import org.apache.drill.exec.exception.SetupException; >> import org.apache.drill.exec.physical.EndpointAffinity; >> @@ -59,8 +58,9 @@ import com.google.common.base.Preconditions; >> public class ParquetGroupScan extends AbstractGroupScan { >> static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class); >> >> - private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings; >> + private ArrayListMultimap<Integer, >> ParquetRowGroupScan.RowGroupReadEntry> mappings; >> private List<RowGroupInfo> rowGroupInfos; >> + private Stopwatch watch = new Stopwatch(); >> >> public List<ReadEntryWithPath> getEntries() { >> return entries; >> @@ -110,16 +110,14 @@ public class ParquetGroupScan extends >> AbstractGroupScan { >> } >> >> private void readFooter() throws IOException { >> - long tA = System.nanoTime(); >> + watch.reset(); >> + watch.start(); >> rowGroupInfos = new ArrayList(); >> long start = 0, length = 0; >> ColumnChunkMetaData columnChunkMetaData; >> for (ReadEntryWithPath readEntryWithPath : entries){ >> Path path = new Path(readEntryWithPath.getPath()); >> ParquetMetadata footer = >> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path); >> -// FileSystem fs = >> FileSystem.get(this.storageEngine.getHadoopConfig()); >> -// FileStatus status = fs.getFileStatus(path); >> -// ParquetMetadata footer = >> ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status); >> readEntryWithPath.getPath(); >> >> int i = 0; >> @@ -138,38 +136,21 @@ public class ParquetGroupScan extends >> AbstractGroupScan { >> i++; >> } >> } >> - long tB = System.nanoTime(); >> - logger.debug("Took {} ms to get row group infos", (float)(tB - tA) / >> 1E6); >> + watch.stop(); >> + logger.debug("Took {} ms to get row group infos", >> watch.elapsed(TimeUnit.MILLISECONDS)); >> } >> >> private void calculateEndpointBytes() { >> - long tA = System.nanoTime(); >> + watch.reset(); >> + watch.start(); >> AffinityCalculator ac = new AffinityCalculator(fileName, fs, >> availableEndpoints); >> for (RowGroupInfo e : rowGroupInfos) { >> ac.setEndpointBytes(e); >> totalBytes += e.getLength(); >> } >> - long tB = System.nanoTime(); >> - logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB - >> tA) / 1E6); >> + watch.stop(); >> + logger.debug("Took {} ms to calculate EndpointBytes", >> watch.elapsed(TimeUnit.MILLISECONDS)); >> } >> -/* >> - public LinkedList<RowGroupInfo> getRowGroups() { >> - return rowGroups; >> - } >> - >> - public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) { >> - this.rowGroups = rowGroups; >> - } >> - >> - public static class ParquetFileReadEntry { >> - >> - String path; >> - >> - public ParquetFileReadEntry(@JsonProperty String path){ >> - this.path = path; >> - } >> - } >> - */ >> >> @JsonIgnore >> public FileSystem getFileSystem() { >> @@ -232,16 +213,22 @@ public class ParquetGroupScan extends >> AbstractGroupScan { >> } >> } >> >> + /** >> + *Calculates the affinity each endpoint has for this scan, by adding >> up the affinity each endpoint has for each >> + * rowGroup >> + * @return a list of EndpointAffinity objects >> + */ >> @Override >> public List<EndpointAffinity> getOperatorAffinity() { >> - long tA = System.nanoTime(); >> + watch.reset(); >> + watch.start(); >> if (this.endpointAffinities == null) { >> HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>(); >> for (RowGroupInfo entry : rowGroupInfos) { >> for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) { >> long bytes = entry.getEndpointBytes().get(d); >> float affinity = (float)bytes / (float)totalBytes; >> - logger.error("RowGroup: {} Endpoint: {} Bytes: {}", >> entry.getRowGroupIndex(), d.getAddress(), bytes); >> + logger.debug("RowGroup: {} Endpoint: {} Bytes: {}", >> entry.getRowGroupIndex(), d.getAddress(), bytes); >> if (affinities.keySet().contains(d)) { >> affinities.put(d, affinities.get(d) + affinity); >> } else { >> @@ -256,83 +243,90 @@ public class ParquetGroupScan extends >> AbstractGroupScan { >> } >> this.endpointAffinities = affinityList; >> } >> - long tB = System.nanoTime(); >> - logger.debug("Took {} ms to get operator affinity", (float)(tB - tA) >> / 1E6); >> + watch.stop(); >> + logger.debug("Took {} ms to get operator affinity", >> watch.elapsed(TimeUnit.MILLISECONDS)); >> return this.endpointAffinities; >> } >> >> >> + static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01}; >> >> - >> + /** >> + * >> + * @param incomingEndpoints >> + */ >> @Override >> - public void applyAssignments(List<DrillbitEndpoint> endpoints) { >> - long tA = System.nanoTime(); >> - Preconditions.checkArgument(endpoints.size() <= >> rowGroupInfos.size()); >> - >> - int i = 0; >> - for (DrillbitEndpoint endpoint : endpoints) { >> - logger.debug("Endpoint index {}, endpoint host: {}", i++, >> endpoint.getAddress()); >> + public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) >> { >> + watch.reset(); >> + watch.start(); >> + Preconditions.checkArgument(incomingEndpoints.size() <= >> rowGroupInfos.size()); >> + mappings = ArrayListMultimap.create(); >> + ArrayList rowGroupList = new ArrayList(rowGroupInfos); >> + List<DrillbitEndpoint> endpointLinkedlist = >> Lists.newLinkedList(incomingEndpoints); >> + for(double cutoff : ASSIGNMENT_CUTOFFS ){ >> + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, >> false); >> } >> - >> - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator()); >> - mappings = new LinkedList[endpoints.size()]; >> - LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints, >> rowGroupInfos, 100, true, false); >> - LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints, >> unassigned, 50, true, false); >> - LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints, >> unassigned2, 25, true, false); >> - LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints, >> unassigned3, 0, false, false); >> - LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints, >> unassigned4, 0, false, true); >> - assert unassigned5.size() == 0 : String.format("All readEntries >> should be assigned by now, but some are still unassigned"); >> - long tB = System.nanoTime(); >> - logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) / >> 1E6); >> + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true); >> + watch.stop(); >> + logger.debug("Took {} ms to apply assignments", >> watch.elapsed(TimeUnit.MILLISECONDS)); >> + Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries >> should be assigned by now, but some are still unassigned"); >> + Preconditions.checkArgument(!rowGroupInfos.isEmpty()); >> } >> >> - private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint> >> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean >> mustContain, boolean assignAll) { >> - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator()); >> - LinkedList<RowGroupInfo> unassigned = new LinkedList<>(); >> - >> - int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() * >> 1.5); >> + public int fragmentPointer = 0; >> + >> + /** >> + * >> + * @param endpointAssignments the mapping between fragment/endpoint >> and rowGroup >> + * @param endpoints the list of drillbits, ordered by the >> corresponding fragment >> + * @param rowGroups the list of rowGroups to assign >> + * @param requiredPercentage the percentage of max bytes required to >> make an assignment >> + * @param assignAll if true, will assign even if no affinity >> + */ >> + private void scanAndAssign (Multimap<Integer, >> ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, >> List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double >> requiredPercentage, boolean assignAll) { >> + Collections.sort(rowGroups, new ParquetReadEntryComparator()); >> + final boolean requireAffinity = requiredPercentage > 0; >> + int maxAssignments = (int) (rowGroups.size() / endpoints.size()); >> + >> + if (maxAssignments < 1) maxAssignments = 1; >> + >> + for(Iterator<RowGroupInfo> iter = rowGroups.iterator(); >> iter.hasNext();){ >> + RowGroupInfo rowGroupInfo = iter.next(); >> + for (int i = 0; i < endpoints.size(); i++) { >> + int minorFragmentId = (fragmentPointer + i) % endpoints.size(); >> + DrillbitEndpoint currentEndpoint = >> endpoints.get(minorFragmentId); >> + Map<DrillbitEndpoint, Long> bytesPerEndpoint = >> rowGroupInfo.getEndpointBytes(); >> + boolean haveAffinity = >> bytesPerEndpoint.containsKey(currentEndpoint) ; >> >> - if (maxEntries < 1) maxEntries = 1; >> - >> - int i =0; >> - for(RowGroupInfo e : rowGroups) { >> - boolean assigned = false; >> - for (int j = i; j < i + endpoints.size(); j++) { >> - DrillbitEndpoint currentEndpoint = >> endpoints.get(j%endpoints.size()); >> if (assignAll || >> - (e.getEndpointBytes().size() > 0 && >> - (e.getEndpointBytes().containsKey(currentEndpoint) || >> !mustContain) && >> - (mappings[j%endpoints.size()] == null || >> mappings[j%endpoints.size()].size() < maxEntries) && >> - e.getEndpointBytes().get(currentEndpoint) >= >> e.getMaxBytes() * requiredPercentage / 100)) { >> - LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries = >> mappings[j%endpoints.size()]; >> - if(entries == null){ >> - entries = new >> LinkedList<ParquetRowGroupScan.RowGroupReadEntry>(); >> - mappings[j%endpoints.size()] = entries; >> - } >> - entries.add(e.getRowGroupReadEntry()); >> - logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}", >> e.getPath(), e.getStart(), currentEndpoint.getAddress()); >> - assigned = true; >> + (!bytesPerEndpoint.isEmpty() && >> + (!requireAffinity || haveAffinity) && >> + >> (!endpointAssignments.containsKey(minorFragmentId) || >> endpointAssignments.get(minorFragmentId).size() < maxAssignments) && >> + bytesPerEndpoint.get(currentEndpoint) >= >> rowGroupInfo.getMaxBytes() * requiredPercentage)) { >> + >> + endpointAssignments.put(minorFragmentId, >> rowGroupInfo.getRowGroupReadEntry()); >> + logger.debug("Assigned rowGroup {} to minorFragmentId {} >> endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, >> endpoints.get(minorFragmentId).getAddress()); >> + iter.remove(); >> + fragmentPointer = (minorFragmentId + 1) % endpoints.size(); >> break; >> } >> } >> - if (!assigned) unassigned.add(e); >> - i++; >> + >> } >> - return unassigned; >> } >> >> @Override >> public ParquetRowGroupScan getSpecificScan(int minorFragmentId) { >> - assert minorFragmentId < mappings.length : String.format("Mappings >> length [%d] should be longer than minor fragment id [%d] but it isn't.", >> mappings.length, minorFragmentId); >> - for (ParquetRowGroupScan.RowGroupReadEntry rg : >> mappings[minorFragmentId]) { >> + assert minorFragmentId < mappings.size() : String.format("Mappings >> length [%d] should be longer than minor fragment id [%d] but it isn't.", >> mappings.size(), minorFragmentId); >> + for (ParquetRowGroupScan.RowGroupReadEntry rg : >> mappings.get(minorFragmentId)) { >> logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: >> {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex()); >> } >> + >> Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(), >> String.format("MinorFragmentId %d has no read entries assigned", >> minorFragmentId)); >> try { >> - return new ParquetRowGroupScan(storageEngine, engineConfig, >> mappings[minorFragmentId]); >> + return new ParquetRowGroupScan(storageEngine, engineConfig, >> mappings.get(minorFragmentId)); >> } catch (SetupException e) { >> - e.printStackTrace(); // TODO - fix this >> + throw new RuntimeException("Error setting up ParquetRowGroupScan", >> e); >> } >> - return null; >> } >> >> @Override >> @@ -342,7 +336,8 @@ public class ParquetGroupScan extends >> AbstractGroupScan { >> >> @Override >> public OperatorCost getCost() { >> - return new OperatorCost(1,1,1,1); >> + //TODO Figure out how to properly calculate cost >> + return new OperatorCost(1,rowGroupInfos.size(),1,1); >> } >> >> @Override >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java >> ---------------------------------------------------------------------- >> diff --git >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java >> index 4e46034..3aaa987 100644 >> --- >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java >> +++ >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java >> @@ -211,8 +211,8 @@ public class ParquetRecordReader implements >> RecordReader { >> } >> for (VarLenBinaryReader.VarLengthColumn r : >> varLengthReader.columns) { >> output.addField(r.valueVecHolder.getValueVector()); >> - output.setNewSchema(); >> } >> + output.setNewSchema(); >> }catch(SchemaChangeException e) { >> throw new ExecutionSetupException("Error setting up output >> mutator.", e); >> } >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java >> ---------------------------------------------------------------------- >> diff --git >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java >> index 03fb4ec..addd288 100644 >> --- >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java >> +++ >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java >> @@ -21,7 +21,9 @@ import java.io.IOException; >> import java.text.SimpleDateFormat; >> import java.util.Date; >> import java.util.List; >> +import java.util.concurrent.TimeUnit; >> >> +import com.google.common.base.Stopwatch; >> import org.apache.drill.common.exceptions.ExecutionSetupException; >> import org.apache.drill.exec.ops.FragmentContext; >> import org.apache.drill.exec.physical.impl.BatchCreator; >> @@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader; >> import parquet.hadoop.metadata.ParquetMetadata; >> >> public class ParquetScanBatchCreator implements >> BatchCreator<ParquetRowGroupScan>{ >> - static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class); >> + static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class); >> >> @Override >> public RecordBatch getBatch(FragmentContext context, >> ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws >> ExecutionSetupException { >> - long tA = System.nanoTime(), tB; >> - System.out.println( new SimpleDateFormat("mm:ss S").format(new >> Date()) + " :Start of ScanBatCreator.scanBatch"); >> + Stopwatch watch = new Stopwatch(); >> + watch.start(); >> Preconditions.checkArgument(children.isEmpty()); >> List<RecordReader> readers = Lists.newArrayList(); >> for(ParquetRowGroupScan.RowGroupReadEntry e : >> rowGroupScan.getRowGroupReadEntries()){ >> @@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements >> BatchCreator<ParquetRowGroupScan >> throw new ExecutionSetupException(e1); >> } >> } >> - System.out.println( "Total time in method: " + ((float) >> (System.nanoTime() - tA) / 1e9)); >> + logger.debug("total time in ScanBatchCreator.getBatch: {} ms", >> watch.elapsed(TimeUnit.MILLISECONDS)); >> return new ScanBatch(context, readers.iterator()); >> } >> } >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java >> ---------------------------------------------------------------------- >> diff --git >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java >> index 9a33109..72c5f34 100644 >> --- >> a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java >> +++ >> b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java >> @@ -35,8 +35,8 @@ public class ErrorHelper { >> if(message != null){ >> sb.append(message); >> } >> - >> - do{ >> + >> + while (true) { >> sb.append(" < "); >> sb.append(t.getClass().getSimpleName()); >> if(t.getMessage() != null){ >> @@ -44,7 +44,9 @@ public class ErrorHelper { >> sb.append(t.getMessage()); >> sb.append(" ]"); >> } >> - }while(t.getCause() != null && t.getCause() != t); >> + if (t.getCause() == null || t.getCause() == t) break; >> + t = t.getCause(); >> + } >> >> builder.setMessage(sb.toString()); >> >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java >> ---------------------------------------------------------------------- >> diff --git >> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java >> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java >> index e2a00f1..18ac294 100644 >> --- >> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java >> +++ >> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java >> @@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan; >> import org.apache.drill.exec.planner.PhysicalPlanReader; >> import org.apache.drill.exec.proto.CoordinationProtos; >> import org.apache.drill.exec.proto.UserProtos; >> +import org.apache.drill.exec.record.RecordBatchLoader; >> +import org.apache.drill.exec.record.VectorWrapper; >> +import org.apache.drill.exec.rpc.RpcException; >> import org.apache.drill.exec.rpc.user.QueryResultBatch; >> +import org.apache.drill.exec.rpc.user.UserResultsListener; >> +import org.apache.drill.exec.server.BootStrapContext; >> import org.apache.drill.exec.server.Drillbit; >> import org.apache.drill.exec.server.RemoteServiceSet; >> import org.apache.drill.exec.store.parquet.ParquetGroupScan; >> +import org.apache.drill.exec.vector.ValueVector; >> import org.apache.hadoop.fs.BlockLocation; >> import org.apache.hadoop.fs.FileStatus; >> import org.apache.hadoop.fs.FileSystem; >> @@ -29,6 +35,7 @@ import java.io.IOException; >> import java.nio.charset.Charset; >> import java.util.LinkedList; >> import java.util.List; >> +import java.util.concurrent.CountDownLatch; >> >> import static junit.framework.Assert.assertNull; >> import static org.junit.Assert.assertEquals; >> @@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan { >> >> //public String fileName = "/physical_test2.json"; >> public String fileName = "parquet_scan_union_screen_physical.json"; >> +// public String fileName = "parquet-sample.json"; >> + >> >> @Test >> @Ignore >> @@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan { >> bit1.run(); >> client.connect(); >> List<QueryResultBatch> results = >> client.runQuery(UserProtos.QueryType.PHYSICAL, >> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8)); >> - System.out.println(String.format("Got %d results", >> results.size())); >> + RecordBatchLoader loader = new >> RecordBatchLoader(bit1.getContext().getAllocator()); >> + for (QueryResultBatch b : results) { >> + System.out.println(String.format("Got %d results", >> b.getHeader().getRowCount())); >> + loader.load(b.getHeader().getDef(), b.getData()); >> + for (VectorWrapper vw : loader) { >> + System.out.println(vw.getValueVector().getField().getName()); >> + ValueVector vv = vw.getValueVector(); >> + for (int i = 0; i < vv.getAccessor().getValueCount(); i++) { >> + Object o = vv.getAccessor().getObject(i); >> + System.out.println(vv.getAccessor().getObject(i)); >> + } >> + } >> + } >> + client.close(); >> + } >> + } >> + >> + private class ParquetResultsListener implements UserResultsListener { >> + private CountDownLatch latch = new CountDownLatch(1); >> + @Override >> + public void submissionFailed(RpcException ex) { >> + logger.error("submission failed", ex); >> + latch.countDown(); >> + } >> + >> + @Override >> + public void resultArrived(QueryResultBatch result) { >> + System.out.printf("Result batch arrived. Number of records: %d", >> result.getHeader().getRowCount()); >> + if (result.getHeader().getIsLastChunk()) latch.countDown(); >> + } >> + >> + public void await() throws Exception { >> + latch.await(); >> + } >> + } >> + @Test >> + @Ignore >> + public void testParseParquetPhysicalPlanRemote() throws Exception { >> + DrillConfig config = DrillConfig.create(); >> + >> + try(DrillClient client = new DrillClient(config);){ >> + client.connect(); >> + ParquetResultsListener listener = new ParquetResultsListener(); >> + client.runQuery(UserProtos.QueryType.PHYSICAL, >> Resources.toString(Resources.getResource(fileName),Charsets.UTF_8), >> listener); >> + listener.await(); >> client.close(); >> } >> } >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java >> ---------------------------------------------------------------------- >> diff --git >> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java >> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java >> index 1d91455..7a99c3f 100644 >> --- >> a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java >> +++ >> b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java >> @@ -48,6 +48,7 @@ import >> org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo; >> import org.apache.drill.exec.vector.BaseDataValueVector; >> import org.apache.drill.exec.vector.ValueVector; >> import org.junit.BeforeClass; >> +import org.junit.Ignore; >> import org.junit.Test; >> >> import parquet.bytes.BytesInput; >> @@ -68,6 +69,7 @@ public class ParquetRecordReaderTest { >> static final org.slf4j.Logger logger = >> org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class); >> >> private boolean VERBOSE_DEBUG = false; >> + private boolean checkValues = true; >> >> static final int numberRowGroups = 20; >> static final int recordsPerRowGroup = 300000; >> @@ -100,6 +102,20 @@ public class ParquetRecordReaderTest { >> testParquetFullEngineLocalText(planText, fileName, i, >> numberRowGroups, recordsPerRowGroup); >> } >> >> + @Test >> + @Ignore >> + public void testLocalDistributed() throws Exception { >> + String planName = "/parquet/parquet_scan_union_screen_physical.json"; >> + testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20, >> 300000); >> + } >> + >> + @Test >> + @Ignore >> + public void testRemoteDistributed() throws Exception { >> + String planName = "/parquet/parquet_scan_union_screen_physical.json"; >> + testParquetFullEngineRemote(planName, fileName, 1, 10, 30000); >> + } >> + >> >> private class ParquetResultListener implements UserResultsListener { >> private SettableFuture<Void> future = SettableFuture.create(); >> @@ -155,8 +171,12 @@ public class ParquetRecordReaderTest { >> if (VERBOSE_DEBUG){ >> System.out.print(vv.getAccessor().getObject(j) + ", " + (j % >> 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); >> } >> - assertField(vv, j, (TypeProtos.MinorType) currentField.type, >> - currentField.values[(int) (columnValCounter % 3)], >> (String) currentField.name + "/"); >> + if (checkValues) { >> + try { >> + assertField(vv, j, (TypeProtos.MinorType) >> currentField.type, >> + currentField.values[(int) (columnValCounter % 3)], >> (String) currentField.name + "/"); >> + } catch (AssertionError e) { submissionFailed(new >> RpcException(e)); } >> + } >> columnValCounter++; >> } >> if (VERBOSE_DEBUG){ >> @@ -197,7 +217,9 @@ public class ParquetRecordReaderTest { >> batchCounter++; >> if(result.getHeader().getIsLastChunk()){ >> for (String s : valuesChecked.keySet()) { >> + try { >> assertEquals("Record count incorrect for column: " + s, >> totalRecords, (long) valuesChecked.get(s)); >> + } catch (AssertionError e) { submissionFailed(new >> RpcException(e)); } >> } >> >> assert valuesChecked.keySet().size() > 0; >> @@ -222,11 +244,13 @@ public class ParquetRecordReaderTest { >> >> DrillConfig config = DrillConfig.create(); >> >> + checkValues = false; >> + >> try(DrillClient client = new DrillClient(config);){ >> client.connect(); >> RecordBatchLoader batchLoader = new >> RecordBatchLoader(client.getAllocator()); >> ParquetResultListener resultListener = new >> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, >> numberOfTimesRead); >> - client.runQuery(UserProtos.QueryType.LOGICAL, >> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), >> resultListener); >> + client.runQuery(UserProtos.QueryType.PHYSICAL, >> Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), >> resultListener); >> resultListener.get(); >> } >> >> @@ -259,6 +283,28 @@ public class ParquetRecordReaderTest { >> } >> >> >> + //use this method to submit physical plan >> + public void testParquetFullEngineLocalTextDistributed(String planName, >> String filename, int numberOfTimesRead /* specified in json plan */, int >> numberOfRowGroups, int recordsPerRowGroup) throws Exception{ >> + >> + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); >> + >> + checkValues = false; >> + >> + DrillConfig config = DrillConfig.create(); >> + >> + try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient >> client = new DrillClient(config, serviceSet.getCoordinator());){ >> + bit1.run(); >> + client.connect(); >> + RecordBatchLoader batchLoader = new >> RecordBatchLoader(client.getAllocator()); >> + ParquetResultListener resultListener = new >> ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, >> numberOfTimesRead); >> + Stopwatch watch = new Stopwatch().start(); >> + client.runQuery(UserProtos.QueryType.PHYSICAL, >> Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8), >> resultListener); >> + resultListener.get(); >> + System.out.println(String.format("Took %d ms to run query", >> watch.elapsed(TimeUnit.MILLISECONDS))); >> + >> + } >> + >> + } >> >> public String pad(String value, int length) { >> return pad(value, length, " "); >> >> >> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json >> ---------------------------------------------------------------------- >> diff --git >> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json >> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json >> index f508d09..5efecaf 100644 >> --- >> a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json >> +++ >> b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json >> @@ -11,10 +11,7 @@ >> @id : 1, >> entries : [ >> { >> - path : "/tmp/testParquetFile_many_types_3" >> - }, >> - { >> - path : "/tmp/testParquetFile_many_types_3" >> + path : "/tmp/parquet_test_file_many_types" >> } >> ], >> storageengine:{ >> >> > > > -- > Regards, > Tanujit > -- Regards, Tanujit
