Hi, when i try mvn install after these changes the org.apache.drill.exec.store.parquet.ParquetRecordReaderTest is hanging.
Tests run: 3, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 1.596 sec - in org.apache.drill.exec.expr.ExpressionTest Running org.apache.drill.exec.store.TestAffinityCalculator Took 0.616287 ms to build range map Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.137 sec - in org.apache.drill.exec.store.TestAffinityCalculator Running org.apache.drill.exec.store.parquet.ParquetRecordReaderTest SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Environment is Fedora 18, open jdk 1.7 with skipTests everything is getting compiled fine. Regards Tanujit On Fri, Aug 23, 2013 at 5:36 AM, <[email protected]> wrote: > DRILL-176: Updates to affinity calculator, fixes for parquet > serialization. Fix to ErrorHelper looping > > > Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo > Commit: > http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617 > Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617 > Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617 > > Branch: refs/heads/master > Commit: 7edd36170a9be291a69e44f6090474193485bf14 > Parents: d6ae53e > Author: Steven Phillips <[email protected]> > Authored: Thu Aug 22 16:18:55 2013 -0700 > Committer: Jacques Nadeau <[email protected]> > Committed: Thu Aug 22 16:18:55 2013 -0700 > > ---------------------------------------------------------------------- > .../drill/exec/planner/fragment/Wrapper.java | 5 +- > .../drill/exec/store/AffinityCalculator.java | 91 ++++++---- > .../exec/store/parquet/ParquetGroupScan.java | 177 +++++++++---------- > .../exec/store/parquet/ParquetRecordReader.java | 2 +- > .../store/parquet/ParquetScanBatchCreator.java | 10 +- > .../drill/exec/work/foreman/ErrorHelper.java | 8 +- > .../exec/store/TestParquetPhysicalPlan.java | 55 +++++- > .../store/parquet/ParquetRecordReaderTest.java | 52 +++++- > .../parquet_scan_union_screen_physical.json | 5 +- > 9 files changed, 257 insertions(+), 148 deletions(-) > ---------------------------------------------------------------------- > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java > index d5a24b0..8c4b0b4 100644 > --- > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java > @@ -151,15 +151,12 @@ public class Wrapper { > for (int i = start; i < start + width; i++) { > endpoints.add(all.get(i % div)); > } > - } else if (values.size() < width) { > - throw new NotImplementedException( > - "Haven't implemented a scenario where we have some node > affinity but the affinity list is smaller than the expected width."); > } else { > // get nodes with highest affinity. > Collections.sort(values); > values = Lists.reverse(values); > for (int i = 0; i < width; i++) { > - endpoints.add(values.get(i).getEndpoint()); > + endpoints.add(values.get(i%values.size()).getEndpoint()); > } > } > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java > index b4092cc..b341ea4 100644 > --- > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java > @@ -1,6 +1,7 @@ > package org.apache.drill.exec.store; > > > +import com.google.common.base.Stopwatch; > import com.google.common.collect.ImmutableRangeMap; > import com.google.common.collect.Range; > import org.apache.drill.exec.store.parquet.ParquetGroupScan; > @@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path; > > import java.io.IOException; > import java.util.*; > +import java.util.concurrent.TimeUnit; > > public class AffinityCalculator { > static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class); > @@ -24,6 +26,7 @@ public class AffinityCalculator { > String fileName; > Collection<DrillbitEndpoint> endpoints; > HashMap<String,DrillbitEndpoint> endPointMap; > + Stopwatch watch = new Stopwatch(); > > public AffinityCalculator(String fileName, FileSystem fs, > Collection<DrillbitEndpoint> endpoints) { > this.fs = fs; > @@ -33,16 +36,20 @@ public class AffinityCalculator { > buildEndpointMap(); > } > > + /** > + * Builds a mapping of block locations to file byte range > + */ > private void buildBlockMap() { > try { > + watch.start(); > FileStatus file = fs.getFileStatus(new Path(fileName)); > - long tC = System.nanoTime(); > blocks = fs.getFileBlockLocations(file, 0 , file.getLen()); > - long tD = System.nanoTime(); > + watch.stop(); > logger.debug("Block locations: {}", blocks); > - logger.debug("Took {} ms to get Block locations", (float)(tD - tC) > / 1e6); > + logger.debug("Took {} ms to get Block locations", > watch.elapsed(TimeUnit.MILLISECONDS)); > } catch (IOException ioe) { throw new RuntimeException(ioe); } > - long tA = System.nanoTime(); > + watch.reset(); > + watch.start(); > ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new > ImmutableRangeMap.Builder<Long,BlockLocation>(); > for (BlockLocation block : blocks) { > long start = block.getOffset(); > @@ -51,62 +58,72 @@ public class AffinityCalculator { > blockMapBuilder = blockMapBuilder.put(range, block); > } > blockMap = blockMapBuilder.build(); > - long tB = System.nanoTime(); > - logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6); > + watch.stop(); > + logger.debug("Took {} ms to build block map", > watch.elapsed(TimeUnit.MILLISECONDS)); > } > /** > + * For a given RowGroup, calculate how many bytes are available on each > on drillbit endpoint > * > - * @param entry > + * @param rowGroup the RowGroup to calculate endpoint bytes for > */ > - public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) { > - long tA = System.nanoTime(); > + public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) { > + watch.reset(); > + watch.start(); > HashMap<String,Long> hostMap = new HashMap<>(); > - long start = entry.getStart(); > - long end = start + entry.getLength(); > - Range<Long> entryRange = Range.closedOpen(start, end); > - ImmutableRangeMap<Long,BlockLocation> subRangeMap = > blockMap.subRangeMap(entryRange); > - for (Map.Entry<Range<Long>,BlockLocation> e : > subRangeMap.asMapOfRanges().entrySet()) { > - String[] hosts = null; > - Range<Long> blockRange = e.getKey(); > + HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap(); > + long start = rowGroup.getStart(); > + long end = start + rowGroup.getLength(); > + Range<Long> rowGroupRange = Range.closedOpen(start, end); > + > + // Find submap of ranges that intersect with the rowGroup > + ImmutableRangeMap<Long,BlockLocation> subRangeMap = > blockMap.subRangeMap(rowGroupRange); > + > + // Iterate through each block in this submap and get the host for the > block location > + for (Map.Entry<Range<Long>,BlockLocation> block : > subRangeMap.asMapOfRanges().entrySet()) { > + String[] hosts; > + Range<Long> blockRange = block.getKey(); > try { > - hosts = e.getValue().getHosts(); > - } catch (IOException ioe) { /*TODO Handle this exception */} > - Range<Long> intersection = entryRange.intersection(blockRange); > + hosts = block.getValue().getHosts(); > + } catch (IOException ioe) { > + throw new RuntimeException("Failed to get hosts for block > location", ioe); > + } > + Range<Long> intersection = rowGroupRange.intersection(blockRange); > long bytes = intersection.upperEndpoint() - > intersection.lowerEndpoint(); > + > + // For each host in the current block location, add the > intersecting bytes to the corresponding endpoint > for (String host : hosts) { > - if (hostMap.containsKey(host)) { > - hostMap.put(host, hostMap.get(host) + bytes); > + DrillbitEndpoint endpoint = getDrillBitEndpoint(host); > + if (endpointByteMap.containsKey(endpoint)) { > + endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) + > bytes); > } else { > - hostMap.put(host, bytes); > + if (endpoint != null ) endpointByteMap.put(endpoint, bytes); > } > } > } > - HashMap<DrillbitEndpoint,Long> ebs = new HashMap(); > - try { > - for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) { > - String host = hostEntry.getKey(); > - Long bytes = hostEntry.getValue(); > - DrillbitEndpoint d = getDrillBitEndpoint(host); > - if (d != null ) ebs.put(d, bytes); > - } > - } catch (NullPointerException n) {} > - entry.setEndpointBytes(ebs); > - long tB = System.nanoTime(); > - logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) / > 1e6); > + > + rowGroup.setEndpointBytes(endpointByteMap); > + rowGroup.setMaxBytes(endpointByteMap.size() > 0 ? > Collections.max(endpointByteMap.values()) : 0); > + logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(), > rowGroup.getStart(), rowGroup.getMaxBytes()); > + watch.stop(); > + logger.debug("Took {} ms to set endpoint bytes", > watch.elapsed(TimeUnit.MILLISECONDS)); > } > > private DrillbitEndpoint getDrillBitEndpoint(String hostName) { > return endPointMap.get(hostName); > } > > + /** > + * Builds a mapping of drillbit endpoints to hostnames > + */ > private void buildEndpointMap() { > - long tA = System.nanoTime(); > + watch.reset(); > + watch.start(); > endPointMap = new HashMap<String, DrillbitEndpoint>(); > for (DrillbitEndpoint d : endpoints) { > String hostName = d.getAddress(); > endPointMap.put(hostName, d); > } > - long tB = System.nanoTime(); > - logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) / > 1e6); > + watch.stop(); > + logger.debug("Took {} ms to build endpoint map", > watch.elapsed(TimeUnit.MILLISECONDS)); > } > } > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java > index 9e48d33..64ced87 100644 > --- > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java > @@ -18,14 +18,13 @@ > package org.apache.drill.exec.store.parquet; > > import java.io.IOException; > -import java.util.ArrayList; > -import java.util.Collection; > -import java.util.Collections; > -import java.util.Comparator; > -import java.util.HashMap; > -import java.util.LinkedList; > -import java.util.List; > +import java.util.*; > +import java.util.concurrent.TimeUnit; > > +import com.google.common.base.Stopwatch; > +import com.google.common.collect.ArrayListMultimap; > +import com.google.common.collect.Lists; > +import com.google.common.collect.Multimap; > import org.apache.drill.common.config.DrillConfig; > import org.apache.drill.exec.exception.SetupException; > import org.apache.drill.exec.physical.EndpointAffinity; > @@ -59,8 +58,9 @@ import com.google.common.base.Preconditions; > public class ParquetGroupScan extends AbstractGroupScan { > static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class); > > - private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings; > + private ArrayListMultimap<Integer, > ParquetRowGroupScan.RowGroupReadEntry> mappings; > private List<RowGroupInfo> rowGroupInfos; > + private Stopwatch watch = new Stopwatch(); > > public List<ReadEntryWithPath> getEntries() { > return entries; > @@ -110,16 +110,14 @@ public class ParquetGroupScan extends > AbstractGroupScan { > } > > private void readFooter() throws IOException { > - long tA = System.nanoTime(); > + watch.reset(); > + watch.start(); > rowGroupInfos = new ArrayList(); > long start = 0, length = 0; > ColumnChunkMetaData columnChunkMetaData; > for (ReadEntryWithPath readEntryWithPath : entries){ > Path path = new Path(readEntryWithPath.getPath()); > ParquetMetadata footer = > ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path); > -// FileSystem fs = > FileSystem.get(this.storageEngine.getHadoopConfig()); > -// FileStatus status = fs.getFileStatus(path); > -// ParquetMetadata footer = > ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status); > readEntryWithPath.getPath(); > > int i = 0; > @@ -138,38 +136,21 @@ public class ParquetGroupScan extends > AbstractGroupScan { > i++; > } > } > - long tB = System.nanoTime(); > - logger.debug("Took {} ms to get row group infos", (float)(tB - tA) / > 1E6); > + watch.stop(); > + logger.debug("Took {} ms to get row group infos", > watch.elapsed(TimeUnit.MILLISECONDS)); > } > > private void calculateEndpointBytes() { > - long tA = System.nanoTime(); > + watch.reset(); > + watch.start(); > AffinityCalculator ac = new AffinityCalculator(fileName, fs, > availableEndpoints); > for (RowGroupInfo e : rowGroupInfos) { > ac.setEndpointBytes(e); > totalBytes += e.getLength(); > } > - long tB = System.nanoTime(); > - logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB - > tA) / 1E6); > + watch.stop(); > + logger.debug("Took {} ms to calculate EndpointBytes", > watch.elapsed(TimeUnit.MILLISECONDS)); > } > -/* > - public LinkedList<RowGroupInfo> getRowGroups() { > - return rowGroups; > - } > - > - public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) { > - this.rowGroups = rowGroups; > - } > - > - public static class ParquetFileReadEntry { > - > - String path; > - > - public ParquetFileReadEntry(@JsonProperty String path){ > - this.path = path; > - } > - } > - */ > > @JsonIgnore > public FileSystem getFileSystem() { > @@ -232,16 +213,22 @@ public class ParquetGroupScan extends > AbstractGroupScan { > } > } > > + /** > + *Calculates the affinity each endpoint has for this scan, by adding up > the affinity each endpoint has for each > + * rowGroup > + * @return a list of EndpointAffinity objects > + */ > @Override > public List<EndpointAffinity> getOperatorAffinity() { > - long tA = System.nanoTime(); > + watch.reset(); > + watch.start(); > if (this.endpointAffinities == null) { > HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>(); > for (RowGroupInfo entry : rowGroupInfos) { > for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) { > long bytes = entry.getEndpointBytes().get(d); > float affinity = (float)bytes / (float)totalBytes; > - logger.error("RowGroup: {} Endpoint: {} Bytes: {}", > entry.getRowGroupIndex(), d.getAddress(), bytes); > + logger.debug("RowGroup: {} Endpoint: {} Bytes: {}", > entry.getRowGroupIndex(), d.getAddress(), bytes); > if (affinities.keySet().contains(d)) { > affinities.put(d, affinities.get(d) + affinity); > } else { > @@ -256,83 +243,90 @@ public class ParquetGroupScan extends > AbstractGroupScan { > } > this.endpointAffinities = affinityList; > } > - long tB = System.nanoTime(); > - logger.debug("Took {} ms to get operator affinity", (float)(tB - tA) > / 1E6); > + watch.stop(); > + logger.debug("Took {} ms to get operator affinity", > watch.elapsed(TimeUnit.MILLISECONDS)); > return this.endpointAffinities; > } > > > + static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01}; > > - > + /** > + * > + * @param incomingEndpoints > + */ > @Override > - public void applyAssignments(List<DrillbitEndpoint> endpoints) { > - long tA = System.nanoTime(); > - Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size()); > - > - int i = 0; > - for (DrillbitEndpoint endpoint : endpoints) { > - logger.debug("Endpoint index {}, endpoint host: {}", i++, > endpoint.getAddress()); > + public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { > + watch.reset(); > + watch.start(); > + Preconditions.checkArgument(incomingEndpoints.size() <= > rowGroupInfos.size()); > + mappings = ArrayListMultimap.create(); > + ArrayList rowGroupList = new ArrayList(rowGroupInfos); > + List<DrillbitEndpoint> endpointLinkedlist = > Lists.newLinkedList(incomingEndpoints); > + for(double cutoff : ASSIGNMENT_CUTOFFS ){ > + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, > false); > } > - > - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator()); > - mappings = new LinkedList[endpoints.size()]; > - LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints, > rowGroupInfos, 100, true, false); > - LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints, > unassigned, 50, true, false); > - LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints, > unassigned2, 25, true, false); > - LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints, > unassigned3, 0, false, false); > - LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints, > unassigned4, 0, false, true); > - assert unassigned5.size() == 0 : String.format("All readEntries > should be assigned by now, but some are still unassigned"); > - long tB = System.nanoTime(); > - logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) / > 1E6); > + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true); > + watch.stop(); > + logger.debug("Took {} ms to apply assignments", > watch.elapsed(TimeUnit.MILLISECONDS)); > + Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries > should be assigned by now, but some are still unassigned"); > + Preconditions.checkArgument(!rowGroupInfos.isEmpty()); > } > > - private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint> > endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean > mustContain, boolean assignAll) { > - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator()); > - LinkedList<RowGroupInfo> unassigned = new LinkedList<>(); > - > - int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() * > 1.5); > + public int fragmentPointer = 0; > + > + /** > + * > + * @param endpointAssignments the mapping between fragment/endpoint and > rowGroup > + * @param endpoints the list of drillbits, ordered by the corresponding > fragment > + * @param rowGroups the list of rowGroups to assign > + * @param requiredPercentage the percentage of max bytes required to > make an assignment > + * @param assignAll if true, will assign even if no affinity > + */ > + private void scanAndAssign (Multimap<Integer, > ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, > List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double > requiredPercentage, boolean assignAll) { > + Collections.sort(rowGroups, new ParquetReadEntryComparator()); > + final boolean requireAffinity = requiredPercentage > 0; > + int maxAssignments = (int) (rowGroups.size() / endpoints.size()); > + > + if (maxAssignments < 1) maxAssignments = 1; > + > + for(Iterator<RowGroupInfo> iter = rowGroups.iterator(); > iter.hasNext();){ > + RowGroupInfo rowGroupInfo = iter.next(); > + for (int i = 0; i < endpoints.size(); i++) { > + int minorFragmentId = (fragmentPointer + i) % endpoints.size(); > + DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId); > + Map<DrillbitEndpoint, Long> bytesPerEndpoint = > rowGroupInfo.getEndpointBytes(); > + boolean haveAffinity = > bytesPerEndpoint.containsKey(currentEndpoint) ; > > - if (maxEntries < 1) maxEntries = 1; > - > - int i =0; > - for(RowGroupInfo e : rowGroups) { > - boolean assigned = false; > - for (int j = i; j < i + endpoints.size(); j++) { > - DrillbitEndpoint currentEndpoint = > endpoints.get(j%endpoints.size()); > if (assignAll || > - (e.getEndpointBytes().size() > 0 && > - (e.getEndpointBytes().containsKey(currentEndpoint) || > !mustContain) && > - (mappings[j%endpoints.size()] == null || > mappings[j%endpoints.size()].size() < maxEntries) && > - e.getEndpointBytes().get(currentEndpoint) >= > e.getMaxBytes() * requiredPercentage / 100)) { > - LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries = > mappings[j%endpoints.size()]; > - if(entries == null){ > - entries = new > LinkedList<ParquetRowGroupScan.RowGroupReadEntry>(); > - mappings[j%endpoints.size()] = entries; > - } > - entries.add(e.getRowGroupReadEntry()); > - logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}", > e.getPath(), e.getStart(), currentEndpoint.getAddress()); > - assigned = true; > + (!bytesPerEndpoint.isEmpty() && > + (!requireAffinity || haveAffinity) && > + > (!endpointAssignments.containsKey(minorFragmentId) || > endpointAssignments.get(minorFragmentId).size() < maxAssignments) && > + bytesPerEndpoint.get(currentEndpoint) >= > rowGroupInfo.getMaxBytes() * requiredPercentage)) { > + > + endpointAssignments.put(minorFragmentId, > rowGroupInfo.getRowGroupReadEntry()); > + logger.debug("Assigned rowGroup {} to minorFragmentId {} > endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, > endpoints.get(minorFragmentId).getAddress()); > + iter.remove(); > + fragmentPointer = (minorFragmentId + 1) % endpoints.size(); > break; > } > } > - if (!assigned) unassigned.add(e); > - i++; > + > } > - return unassigned; > } > > @Override > public ParquetRowGroupScan getSpecificScan(int minorFragmentId) { > - assert minorFragmentId < mappings.length : String.format("Mappings > length [%d] should be longer than minor fragment id [%d] but it isn't.", > mappings.length, minorFragmentId); > - for (ParquetRowGroupScan.RowGroupReadEntry rg : > mappings[minorFragmentId]) { > + assert minorFragmentId < mappings.size() : String.format("Mappings > length [%d] should be longer than minor fragment id [%d] but it isn't.", > mappings.size(), minorFragmentId); > + for (ParquetRowGroupScan.RowGroupReadEntry rg : > mappings.get(minorFragmentId)) { > logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: > {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex()); > } > + Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(), > String.format("MinorFragmentId %d has no read entries assigned", > minorFragmentId)); > try { > - return new ParquetRowGroupScan(storageEngine, engineConfig, > mappings[minorFragmentId]); > + return new ParquetRowGroupScan(storageEngine, engineConfig, > mappings.get(minorFragmentId)); > } catch (SetupException e) { > - e.printStackTrace(); // TODO - fix this > + throw new RuntimeException("Error setting up ParquetRowGroupScan", > e); > } > - return null; > } > > @Override > @@ -342,7 +336,8 @@ public class ParquetGroupScan extends > AbstractGroupScan { > > @Override > public OperatorCost getCost() { > - return new OperatorCost(1,1,1,1); > + //TODO Figure out how to properly calculate cost > + return new OperatorCost(1,rowGroupInfos.size(),1,1); > } > > @Override > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java > index 4e46034..3aaa987 100644 > --- > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java > @@ -211,8 +211,8 @@ public class ParquetRecordReader implements > RecordReader { > } > for (VarLenBinaryReader.VarLengthColumn r : > varLengthReader.columns) { > output.addField(r.valueVecHolder.getValueVector()); > - output.setNewSchema(); > } > + output.setNewSchema(); > }catch(SchemaChangeException e) { > throw new ExecutionSetupException("Error setting up output > mutator.", e); > } > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java > index 03fb4ec..addd288 100644 > --- > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java > @@ -21,7 +21,9 @@ import java.io.IOException; > import java.text.SimpleDateFormat; > import java.util.Date; > import java.util.List; > +import java.util.concurrent.TimeUnit; > > +import com.google.common.base.Stopwatch; > import org.apache.drill.common.exceptions.ExecutionSetupException; > import org.apache.drill.exec.ops.FragmentContext; > import org.apache.drill.exec.physical.impl.BatchCreator; > @@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader; > import parquet.hadoop.metadata.ParquetMetadata; > > public class ParquetScanBatchCreator implements > BatchCreator<ParquetRowGroupScan>{ > - static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class); > + static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class); > > @Override > public RecordBatch getBatch(FragmentContext context, > ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws > ExecutionSetupException { > - long tA = System.nanoTime(), tB; > - System.out.println( new SimpleDateFormat("mm:ss S").format(new > Date()) + " :Start of ScanBatCreator.scanBatch"); > + Stopwatch watch = new Stopwatch(); > + watch.start(); > Preconditions.checkArgument(children.isEmpty()); > List<RecordReader> readers = Lists.newArrayList(); > for(ParquetRowGroupScan.RowGroupReadEntry e : > rowGroupScan.getRowGroupReadEntries()){ > @@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements > BatchCreator<ParquetRowGroupScan > throw new ExecutionSetupException(e1); > } > } > - System.out.println( "Total time in method: " + ((float) > (System.nanoTime() - tA) / 1e9)); > + logger.debug("total time in ScanBatchCreator.getBatch: {} ms", > watch.elapsed(TimeUnit.MILLISECONDS)); > return new ScanBatch(context, readers.iterator()); > } > } > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java > index 9a33109..72c5f34 100644 > --- > a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java > +++ > b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java > @@ -35,8 +35,8 @@ public class ErrorHelper { > if(message != null){ > sb.append(message); > } > - > - do{ > + > + while (true) { > sb.append(" < "); > sb.append(t.getClass().getSimpleName()); > if(t.getMessage() != null){ > @@ -44,7 +44,9 @@ public class ErrorHelper { > sb.append(t.getMessage()); > sb.append(" ]"); > } > - }while(t.getCause() != null && t.getCause() != t); > + if (t.getCause() == null || t.getCause() == t) break; > + t = t.getCause(); > + } > > builder.setMessage(sb.toString()); > > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java > b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java > index e2a00f1..18ac294 100644 > --- > a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java > +++ > b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java > @@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan; > import org.apache.drill.exec.planner.PhysicalPlanReader; > import org.apache.drill.exec.proto.CoordinationProtos; > import org.apache.drill.exec.proto.UserProtos; > +import org.apache.drill.exec.record.RecordBatchLoader; > +import org.apache.drill.exec.record.VectorWrapper; > +import org.apache.drill.exec.rpc.RpcException; > import org.apache.drill.exec.rpc.user.QueryResultBatch; > +import org.apache.drill.exec.rpc.user.UserResultsListener; > +import org.apache.drill.exec.server.BootStrapContext; > import org.apache.drill.exec.server.Drillbit; > import org.apache.drill.exec.server.RemoteServiceSet; > import org.apache.drill.exec.store.parquet.ParquetGroupScan; > +import org.apache.drill.exec.vector.ValueVector; > import org.apache.hadoop.fs.BlockLocation; > import org.apache.hadoop.fs.FileStatus; > import org.apache.hadoop.fs.FileSystem; > @@ -29,6 +35,7 @@ import java.io.IOException; > import java.nio.charset.Charset; > import java.util.LinkedList; > import java.util.List; > +import java.util.concurrent.CountDownLatch; > > import static junit.framework.Assert.assertNull; > import static org.junit.Assert.assertEquals; > @@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan { > > //public String fileName = "/physical_test2.json"; > public String fileName = "parquet_scan_union_screen_physical.json"; > +// public String fileName = "parquet-sample.json"; > + > > @Test > @Ignore > @@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan { > bit1.run(); > client.connect(); > List<QueryResultBatch> results = > client.runQuery(UserProtos.QueryType.PHYSICAL, > Resources.toString(Resources.getResource(fileName),Charsets.UTF_8)); > - System.out.println(String.format("Got %d results", results.size())); > + RecordBatchLoader loader = new > RecordBatchLoader(bit1.getContext().getAllocator()); > + for (QueryResultBatch b : results) { > + System.out.println(String.format("Got %d results", > b.getHeader().getRowCount())); > + loader.load(b.getHeader().getDef(), b.getData()); > + for (VectorWrapper vw : loader) { > + System.out.println(vw.getValueVector().getField().getName()); > + ValueVector vv = vw.getValueVector(); > + for (int i = 0; i < vv.getAccessor().getValueCount(); i++) { > + Object o = vv.getAccessor().getObject(i); > + System.out.println(vv.getAccessor().getObject(i)); > + } > + } > + } > + client.close(); > + } > + } > + > + private class ParquetResultsListener implements UserResultsListener { > + private CountDownLatch latch = new CountDownLatch(1); > + @Override > + public void submissionFailed(RpcException ex) { > + logger.error("submission failed", ex); > + latch.countDown(); > + } > + > + @Override > + public void resultArrived(QueryResultBatch result) { > + System.out.printf("Result batch arrived. Number of records: %d", > result.getHeader().getRowCount()); > + if (result.getHeader().getIsLastChunk()) latch.countDown(); > + } > + > + public void await() throws Exception { > + latch.await(); > + } > + } > + @Test > + @Ignore > + public void testParseParquetPhysicalPlanRemote() throws Exception { > + DrillConfig config = DrillConfig.create(); > + > + try(DrillClient client = new DrillClient(config);){ > + client.connect(); > + ParquetResultsListener listener = new ParquetResultsListener(); > + client.runQuery(UserProtos.QueryType.PHYSICAL, > Resources.toString(Resources.getResource(fileName),Charsets.UTF_8), > listener); > + listener.await(); > client.close(); > } > } > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java > b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java > index 1d91455..7a99c3f 100644 > --- > a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java > +++ > b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java > @@ -48,6 +48,7 @@ import > org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo; > import org.apache.drill.exec.vector.BaseDataValueVector; > import org.apache.drill.exec.vector.ValueVector; > import org.junit.BeforeClass; > +import org.junit.Ignore; > import org.junit.Test; > > import parquet.bytes.BytesInput; > @@ -68,6 +69,7 @@ public class ParquetRecordReaderTest { > static final org.slf4j.Logger logger = > org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class); > > private boolean VERBOSE_DEBUG = false; > + private boolean checkValues = true; > > static final int numberRowGroups = 20; > static final int recordsPerRowGroup = 300000; > @@ -100,6 +102,20 @@ public class ParquetRecordReaderTest { > testParquetFullEngineLocalText(planText, fileName, i, > numberRowGroups, recordsPerRowGroup); > } > > + @Test > + @Ignore > + public void testLocalDistributed() throws Exception { > + String planName = "/parquet/parquet_scan_union_screen_physical.json"; > + testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20, > 300000); > + } > + > + @Test > + @Ignore > + public void testRemoteDistributed() throws Exception { > + String planName = "/parquet/parquet_scan_union_screen_physical.json"; > + testParquetFullEngineRemote(planName, fileName, 1, 10, 30000); > + } > + > > private class ParquetResultListener implements UserResultsListener { > private SettableFuture<Void> future = SettableFuture.create(); > @@ -155,8 +171,12 @@ public class ParquetRecordReaderTest { > if (VERBOSE_DEBUG){ > System.out.print(vv.getAccessor().getObject(j) + ", " + (j % > 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); > } > - assertField(vv, j, (TypeProtos.MinorType) currentField.type, > - currentField.values[(int) (columnValCounter % 3)], (String) > currentField.name + "/"); > + if (checkValues) { > + try { > + assertField(vv, j, (TypeProtos.MinorType) currentField.type, > + currentField.values[(int) (columnValCounter % 3)], > (String) currentField.name + "/"); > + } catch (AssertionError e) { submissionFailed(new > RpcException(e)); } > + } > columnValCounter++; > } > if (VERBOSE_DEBUG){ > @@ -197,7 +217,9 @@ public class ParquetRecordReaderTest { > batchCounter++; > if(result.getHeader().getIsLastChunk()){ > for (String s : valuesChecked.keySet()) { > + try { > assertEquals("Record count incorrect for column: " + s, > totalRecords, (long) valuesChecked.get(s)); > + } catch (AssertionError e) { submissionFailed(new > RpcException(e)); } > } > > assert valuesChecked.keySet().size() > 0; > @@ -222,11 +244,13 @@ public class ParquetRecordReaderTest { > > DrillConfig config = DrillConfig.create(); > > + checkValues = false; > + > try(DrillClient client = new DrillClient(config);){ > client.connect(); > RecordBatchLoader batchLoader = new > RecordBatchLoader(client.getAllocator()); > ParquetResultListener resultListener = new > ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, > numberOfTimesRead); > - client.runQuery(UserProtos.QueryType.LOGICAL, > Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), > resultListener); > + client.runQuery(UserProtos.QueryType.PHYSICAL, > Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), > resultListener); > resultListener.get(); > } > > @@ -259,6 +283,28 @@ public class ParquetRecordReaderTest { > } > > > + //use this method to submit physical plan > + public void testParquetFullEngineLocalTextDistributed(String planName, > String filename, int numberOfTimesRead /* specified in json plan */, int > numberOfRowGroups, int recordsPerRowGroup) throws Exception{ > + > + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); > + > + checkValues = false; > + > + DrillConfig config = DrillConfig.create(); > + > + try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient > client = new DrillClient(config, serviceSet.getCoordinator());){ > + bit1.run(); > + client.connect(); > + RecordBatchLoader batchLoader = new > RecordBatchLoader(client.getAllocator()); > + ParquetResultListener resultListener = new > ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, > numberOfTimesRead); > + Stopwatch watch = new Stopwatch().start(); > + client.runQuery(UserProtos.QueryType.PHYSICAL, > Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8), > resultListener); > + resultListener.get(); > + System.out.println(String.format("Took %d ms to run query", > watch.elapsed(TimeUnit.MILLISECONDS))); > + > + } > + > + } > > public String pad(String value, int length) { > return pad(value, length, " "); > > > http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json > ---------------------------------------------------------------------- > diff --git > a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json > b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json > index f508d09..5efecaf 100644 > --- > a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json > +++ > b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json > @@ -11,10 +11,7 @@ > @id : 1, > entries : [ > { > - path : "/tmp/testParquetFile_many_types_3" > - }, > - { > - path : "/tmp/testParquetFile_many_types_3" > + path : "/tmp/parquet_test_file_many_types" > } > ], > storageengine:{ > > -- Regards, Tanujit
