DRILL-176: Updates to affinity calculator, fixes for parquet serialization. Fix to ErrorHelper looping
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/7edd3617 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/7edd3617 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/7edd3617 Branch: refs/heads/master Commit: 7edd36170a9be291a69e44f6090474193485bf14 Parents: d6ae53e Author: Steven Phillips <[email protected]> Authored: Thu Aug 22 16:18:55 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Aug 22 16:18:55 2013 -0700 ---------------------------------------------------------------------- .../drill/exec/planner/fragment/Wrapper.java | 5 +- .../drill/exec/store/AffinityCalculator.java | 91 ++++++---- .../exec/store/parquet/ParquetGroupScan.java | 177 +++++++++---------- .../exec/store/parquet/ParquetRecordReader.java | 2 +- .../store/parquet/ParquetScanBatchCreator.java | 10 +- .../drill/exec/work/foreman/ErrorHelper.java | 8 +- .../exec/store/TestParquetPhysicalPlan.java | 55 +++++- .../store/parquet/ParquetRecordReaderTest.java | 52 +++++- .../parquet_scan_union_screen_physical.json | 5 +- 9 files changed, 257 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java index d5a24b0..8c4b0b4 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java @@ -151,15 +151,12 @@ public class Wrapper { for (int i = start; i < start + width; i++) { endpoints.add(all.get(i % div)); } - } else if (values.size() < width) { - throw new NotImplementedException( - "Haven't implemented a scenario where we have some node affinity but the affinity list is smaller than the expected width."); } else { // get nodes with highest affinity. Collections.sort(values); values = Lists.reverse(values); for (int i = 0; i < width; i++) { - endpoints.add(values.get(i).getEndpoint()); + endpoints.add(values.get(i%values.size()).getEndpoint()); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java index b4092cc..b341ea4 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/AffinityCalculator.java @@ -1,6 +1,7 @@ package org.apache.drill.exec.store; +import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableRangeMap; import com.google.common.collect.Range; import org.apache.drill.exec.store.parquet.ParquetGroupScan; @@ -13,6 +14,7 @@ import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.*; +import java.util.concurrent.TimeUnit; public class AffinityCalculator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AffinityCalculator.class); @@ -24,6 +26,7 @@ public class AffinityCalculator { String fileName; Collection<DrillbitEndpoint> endpoints; HashMap<String,DrillbitEndpoint> endPointMap; + Stopwatch watch = new Stopwatch(); public AffinityCalculator(String fileName, FileSystem fs, Collection<DrillbitEndpoint> endpoints) { this.fs = fs; @@ -33,16 +36,20 @@ public class AffinityCalculator { buildEndpointMap(); } + /** + * Builds a mapping of block locations to file byte range + */ private void buildBlockMap() { try { + watch.start(); FileStatus file = fs.getFileStatus(new Path(fileName)); - long tC = System.nanoTime(); blocks = fs.getFileBlockLocations(file, 0 , file.getLen()); - long tD = System.nanoTime(); + watch.stop(); logger.debug("Block locations: {}", blocks); - logger.debug("Took {} ms to get Block locations", (float)(tD - tC) / 1e6); + logger.debug("Took {} ms to get Block locations", watch.elapsed(TimeUnit.MILLISECONDS)); } catch (IOException ioe) { throw new RuntimeException(ioe); } - long tA = System.nanoTime(); + watch.reset(); + watch.start(); ImmutableRangeMap.Builder<Long, BlockLocation> blockMapBuilder = new ImmutableRangeMap.Builder<Long,BlockLocation>(); for (BlockLocation block : blocks) { long start = block.getOffset(); @@ -51,62 +58,72 @@ public class AffinityCalculator { blockMapBuilder = blockMapBuilder.put(range, block); } blockMap = blockMapBuilder.build(); - long tB = System.nanoTime(); - logger.debug("Took {} ms to build block map", (float)(tB - tA) / 1e6); + watch.stop(); + logger.debug("Took {} ms to build block map", watch.elapsed(TimeUnit.MILLISECONDS)); } /** + * For a given RowGroup, calculate how many bytes are available on each on drillbit endpoint * - * @param entry + * @param rowGroup the RowGroup to calculate endpoint bytes for */ - public void setEndpointBytes(ParquetGroupScan.RowGroupInfo entry) { - long tA = System.nanoTime(); + public void setEndpointBytes(ParquetGroupScan.RowGroupInfo rowGroup) { + watch.reset(); + watch.start(); HashMap<String,Long> hostMap = new HashMap<>(); - long start = entry.getStart(); - long end = start + entry.getLength(); - Range<Long> entryRange = Range.closedOpen(start, end); - ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(entryRange); - for (Map.Entry<Range<Long>,BlockLocation> e : subRangeMap.asMapOfRanges().entrySet()) { - String[] hosts = null; - Range<Long> blockRange = e.getKey(); + HashMap<DrillbitEndpoint,Long> endpointByteMap = new HashMap(); + long start = rowGroup.getStart(); + long end = start + rowGroup.getLength(); + Range<Long> rowGroupRange = Range.closedOpen(start, end); + + // Find submap of ranges that intersect with the rowGroup + ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange); + + // Iterate through each block in this submap and get the host for the block location + for (Map.Entry<Range<Long>,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) { + String[] hosts; + Range<Long> blockRange = block.getKey(); try { - hosts = e.getValue().getHosts(); - } catch (IOException ioe) { /*TODO Handle this exception */} - Range<Long> intersection = entryRange.intersection(blockRange); + hosts = block.getValue().getHosts(); + } catch (IOException ioe) { + throw new RuntimeException("Failed to get hosts for block location", ioe); + } + Range<Long> intersection = rowGroupRange.intersection(blockRange); long bytes = intersection.upperEndpoint() - intersection.lowerEndpoint(); + + // For each host in the current block location, add the intersecting bytes to the corresponding endpoint for (String host : hosts) { - if (hostMap.containsKey(host)) { - hostMap.put(host, hostMap.get(host) + bytes); + DrillbitEndpoint endpoint = getDrillBitEndpoint(host); + if (endpointByteMap.containsKey(endpoint)) { + endpointByteMap.put(endpoint, endpointByteMap.get(endpoint) + bytes); } else { - hostMap.put(host, bytes); + if (endpoint != null ) endpointByteMap.put(endpoint, bytes); } } } - HashMap<DrillbitEndpoint,Long> ebs = new HashMap(); - try { - for (Map.Entry<String,Long> hostEntry : hostMap.entrySet()) { - String host = hostEntry.getKey(); - Long bytes = hostEntry.getValue(); - DrillbitEndpoint d = getDrillBitEndpoint(host); - if (d != null ) ebs.put(d, bytes); - } - } catch (NullPointerException n) {} - entry.setEndpointBytes(ebs); - long tB = System.nanoTime(); - logger.debug("Took {} ms to set endpoint bytes", (float)(tB - tA) / 1e6); + + rowGroup.setEndpointBytes(endpointByteMap); + rowGroup.setMaxBytes(endpointByteMap.size() > 0 ? Collections.max(endpointByteMap.values()) : 0); + logger.debug("Row group ({},{}) max bytes {}", rowGroup.getPath(), rowGroup.getStart(), rowGroup.getMaxBytes()); + watch.stop(); + logger.debug("Took {} ms to set endpoint bytes", watch.elapsed(TimeUnit.MILLISECONDS)); } private DrillbitEndpoint getDrillBitEndpoint(String hostName) { return endPointMap.get(hostName); } + /** + * Builds a mapping of drillbit endpoints to hostnames + */ private void buildEndpointMap() { - long tA = System.nanoTime(); + watch.reset(); + watch.start(); endPointMap = new HashMap<String, DrillbitEndpoint>(); for (DrillbitEndpoint d : endpoints) { String hostName = d.getAddress(); endPointMap.put(hostName, d); } - long tB = System.nanoTime(); - logger.debug("Took {} ms to build endpoint map", (float)(tB - tA) / 1e6); + watch.stop(); + logger.debug("Took {} ms to build endpoint map", watch.elapsed(TimeUnit.MILLISECONDS)); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 9e48d33..64ced87 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -18,14 +18,13 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; +import java.util.*; +import java.util.concurrent.TimeUnit; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.exception.SetupException; import org.apache.drill.exec.physical.EndpointAffinity; @@ -59,8 +58,9 @@ import com.google.common.base.Preconditions; public class ParquetGroupScan extends AbstractGroupScan { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetGroupScan.class); - private LinkedList<ParquetRowGroupScan.RowGroupReadEntry>[] mappings; + private ArrayListMultimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> mappings; private List<RowGroupInfo> rowGroupInfos; + private Stopwatch watch = new Stopwatch(); public List<ReadEntryWithPath> getEntries() { return entries; @@ -110,16 +110,14 @@ public class ParquetGroupScan extends AbstractGroupScan { } private void readFooter() throws IOException { - long tA = System.nanoTime(); + watch.reset(); + watch.start(); rowGroupInfos = new ArrayList(); long start = 0, length = 0; ColumnChunkMetaData columnChunkMetaData; for (ReadEntryWithPath readEntryWithPath : entries){ Path path = new Path(readEntryWithPath.getPath()); ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), path); -// FileSystem fs = FileSystem.get(this.storageEngine.getHadoopConfig()); -// FileStatus status = fs.getFileStatus(path); -// ParquetMetadata footer = ParquetFileReader.readFooter(this.storageEngine.getHadoopConfig(), status); readEntryWithPath.getPath(); int i = 0; @@ -138,38 +136,21 @@ public class ParquetGroupScan extends AbstractGroupScan { i++; } } - long tB = System.nanoTime(); - logger.debug("Took {} ms to get row group infos", (float)(tB - tA) / 1E6); + watch.stop(); + logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS)); } private void calculateEndpointBytes() { - long tA = System.nanoTime(); + watch.reset(); + watch.start(); AffinityCalculator ac = new AffinityCalculator(fileName, fs, availableEndpoints); for (RowGroupInfo e : rowGroupInfos) { ac.setEndpointBytes(e); totalBytes += e.getLength(); } - long tB = System.nanoTime(); - logger.debug("Took {} ms to calculate EndpointBytes", (float)(tB - tA) / 1E6); + watch.stop(); + logger.debug("Took {} ms to calculate EndpointBytes", watch.elapsed(TimeUnit.MILLISECONDS)); } -/* - public LinkedList<RowGroupInfo> getRowGroups() { - return rowGroups; - } - - public void setRowGroups(LinkedList<RowGroupInfo> rowGroups) { - this.rowGroups = rowGroups; - } - - public static class ParquetFileReadEntry { - - String path; - - public ParquetFileReadEntry(@JsonProperty String path){ - this.path = path; - } - } - */ @JsonIgnore public FileSystem getFileSystem() { @@ -232,16 +213,22 @@ public class ParquetGroupScan extends AbstractGroupScan { } } + /** + *Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each + * rowGroup + * @return a list of EndpointAffinity objects + */ @Override public List<EndpointAffinity> getOperatorAffinity() { - long tA = System.nanoTime(); + watch.reset(); + watch.start(); if (this.endpointAffinities == null) { HashMap<DrillbitEndpoint, Float> affinities = new HashMap<>(); for (RowGroupInfo entry : rowGroupInfos) { for (DrillbitEndpoint d : entry.getEndpointBytes().keySet()) { long bytes = entry.getEndpointBytes().get(d); float affinity = (float)bytes / (float)totalBytes; - logger.error("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes); + logger.debug("RowGroup: {} Endpoint: {} Bytes: {}", entry.getRowGroupIndex(), d.getAddress(), bytes); if (affinities.keySet().contains(d)) { affinities.put(d, affinities.get(d) + affinity); } else { @@ -256,83 +243,90 @@ public class ParquetGroupScan extends AbstractGroupScan { } this.endpointAffinities = affinityList; } - long tB = System.nanoTime(); - logger.debug("Took {} ms to get operator affinity", (float)(tB - tA) / 1E6); + watch.stop(); + logger.debug("Took {} ms to get operator affinity", watch.elapsed(TimeUnit.MILLISECONDS)); return this.endpointAffinities; } + static final double[] ASSIGNMENT_CUTOFFS = {0.99, 0.50, 0.25, 0.01}; - + /** + * + * @param incomingEndpoints + */ @Override - public void applyAssignments(List<DrillbitEndpoint> endpoints) { - long tA = System.nanoTime(); - Preconditions.checkArgument(endpoints.size() <= rowGroupInfos.size()); - - int i = 0; - for (DrillbitEndpoint endpoint : endpoints) { - logger.debug("Endpoint index {}, endpoint host: {}", i++, endpoint.getAddress()); + public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { + watch.reset(); + watch.start(); + Preconditions.checkArgument(incomingEndpoints.size() <= rowGroupInfos.size()); + mappings = ArrayListMultimap.create(); + ArrayList rowGroupList = new ArrayList(rowGroupInfos); + List<DrillbitEndpoint> endpointLinkedlist = Lists.newLinkedList(incomingEndpoints); + for(double cutoff : ASSIGNMENT_CUTOFFS ){ + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, cutoff, false); } - - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator()); - mappings = new LinkedList[endpoints.size()]; - LinkedList<RowGroupInfo> unassigned = scanAndAssign(endpoints, rowGroupInfos, 100, true, false); - LinkedList<RowGroupInfo> unassigned2 = scanAndAssign(endpoints, unassigned, 50, true, false); - LinkedList<RowGroupInfo> unassigned3 = scanAndAssign(endpoints, unassigned2, 25, true, false); - LinkedList<RowGroupInfo> unassigned4 = scanAndAssign(endpoints, unassigned3, 0, false, false); - LinkedList<RowGroupInfo> unassigned5 = scanAndAssign(endpoints, unassigned4, 0, false, true); - assert unassigned5.size() == 0 : String.format("All readEntries should be assigned by now, but some are still unassigned"); - long tB = System.nanoTime(); - logger.debug("Took {} ms to apply assignments ", (float)(tB - tA) / 1E6); + scanAndAssign(mappings, endpointLinkedlist, rowGroupList, 0.0, true); + watch.stop(); + logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS)); + Preconditions.checkArgument(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned"); + Preconditions.checkArgument(!rowGroupInfos.isEmpty()); } - private LinkedList<RowGroupInfo> scanAndAssign (List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, int requiredPercentage, boolean mustContain, boolean assignAll) { - Collections.sort(rowGroupInfos, new ParquetReadEntryComparator()); - LinkedList<RowGroupInfo> unassigned = new LinkedList<>(); - - int maxEntries = (int) (rowGroupInfos.size() / endpoints.size() * 1.5); + public int fragmentPointer = 0; + + /** + * + * @param endpointAssignments the mapping between fragment/endpoint and rowGroup + * @param endpoints the list of drillbits, ordered by the corresponding fragment + * @param rowGroups the list of rowGroups to assign + * @param requiredPercentage the percentage of max bytes required to make an assignment + * @param assignAll if true, will assign even if no affinity + */ + private void scanAndAssign (Multimap<Integer, ParquetRowGroupScan.RowGroupReadEntry> endpointAssignments, List<DrillbitEndpoint> endpoints, List<RowGroupInfo> rowGroups, double requiredPercentage, boolean assignAll) { + Collections.sort(rowGroups, new ParquetReadEntryComparator()); + final boolean requireAffinity = requiredPercentage > 0; + int maxAssignments = (int) (rowGroups.size() / endpoints.size()); + + if (maxAssignments < 1) maxAssignments = 1; + + for(Iterator<RowGroupInfo> iter = rowGroups.iterator(); iter.hasNext();){ + RowGroupInfo rowGroupInfo = iter.next(); + for (int i = 0; i < endpoints.size(); i++) { + int minorFragmentId = (fragmentPointer + i) % endpoints.size(); + DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId); + Map<DrillbitEndpoint, Long> bytesPerEndpoint = rowGroupInfo.getEndpointBytes(); + boolean haveAffinity = bytesPerEndpoint.containsKey(currentEndpoint) ; - if (maxEntries < 1) maxEntries = 1; - - int i =0; - for(RowGroupInfo e : rowGroups) { - boolean assigned = false; - for (int j = i; j < i + endpoints.size(); j++) { - DrillbitEndpoint currentEndpoint = endpoints.get(j%endpoints.size()); if (assignAll || - (e.getEndpointBytes().size() > 0 && - (e.getEndpointBytes().containsKey(currentEndpoint) || !mustContain) && - (mappings[j%endpoints.size()] == null || mappings[j%endpoints.size()].size() < maxEntries) && - e.getEndpointBytes().get(currentEndpoint) >= e.getMaxBytes() * requiredPercentage / 100)) { - LinkedList<ParquetRowGroupScan.RowGroupReadEntry> entries = mappings[j%endpoints.size()]; - if(entries == null){ - entries = new LinkedList<ParquetRowGroupScan.RowGroupReadEntry>(); - mappings[j%endpoints.size()] = entries; - } - entries.add(e.getRowGroupReadEntry()); - logger.debug("Assigned rowGroup ( {} , {} ) to endpoint {}", e.getPath(), e.getStart(), currentEndpoint.getAddress()); - assigned = true; + (!bytesPerEndpoint.isEmpty() && + (!requireAffinity || haveAffinity) && + (!endpointAssignments.containsKey(minorFragmentId) || endpointAssignments.get(minorFragmentId).size() < maxAssignments) && + bytesPerEndpoint.get(currentEndpoint) >= rowGroupInfo.getMaxBytes() * requiredPercentage)) { + + endpointAssignments.put(minorFragmentId, rowGroupInfo.getRowGroupReadEntry()); + logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(), minorFragmentId, endpoints.get(minorFragmentId).getAddress()); + iter.remove(); + fragmentPointer = (minorFragmentId + 1) % endpoints.size(); break; } } - if (!assigned) unassigned.add(e); - i++; + } - return unassigned; } @Override public ParquetRowGroupScan getSpecificScan(int minorFragmentId) { - assert minorFragmentId < mappings.length : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.length, minorFragmentId); - for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings[minorFragmentId]) { + assert minorFragmentId < mappings.size() : String.format("Mappings length [%d] should be longer than minor fragment id [%d] but it isn't.", mappings.size(), minorFragmentId); + for (ParquetRowGroupScan.RowGroupReadEntry rg : mappings.get(minorFragmentId)) { logger.debug("minorFragmentId: {} Path: {} RowGroupIndex: {}",minorFragmentId, rg.getPath(),rg.getRowGroupIndex()); } + Preconditions.checkArgument(!mappings.get(minorFragmentId).isEmpty(), String.format("MinorFragmentId %d has no read entries assigned", minorFragmentId)); try { - return new ParquetRowGroupScan(storageEngine, engineConfig, mappings[minorFragmentId]); + return new ParquetRowGroupScan(storageEngine, engineConfig, mappings.get(minorFragmentId)); } catch (SetupException e) { - e.printStackTrace(); // TODO - fix this + throw new RuntimeException("Error setting up ParquetRowGroupScan", e); } - return null; } @Override @@ -342,7 +336,8 @@ public class ParquetGroupScan extends AbstractGroupScan { @Override public OperatorCost getCost() { - return new OperatorCost(1,1,1,1); + //TODO Figure out how to properly calculate cost + return new OperatorCost(1,rowGroupInfos.size(),1,1); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index 4e46034..3aaa987 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -211,8 +211,8 @@ public class ParquetRecordReader implements RecordReader { } for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns) { output.addField(r.valueVecHolder.getValueVector()); - output.setNewSchema(); } + output.setNewSchema(); }catch(SchemaChangeException e) { throw new ExecutionSetupException("Error setting up output mutator.", e); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 03fb4ec..addd288 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; +import java.util.concurrent.TimeUnit; +import com.google.common.base.Stopwatch; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -40,12 +42,12 @@ import parquet.hadoop.ParquetFileReader; import parquet.hadoop.metadata.ParquetMetadata; public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class); + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class); @Override public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException { - long tA = System.nanoTime(), tB; - System.out.println( new SimpleDateFormat("mm:ss S").format(new Date()) + " :Start of ScanBatCreator.scanBatch"); + Stopwatch watch = new Stopwatch(); + watch.start(); Preconditions.checkArgument(children.isEmpty()); List<RecordReader> readers = Lists.newArrayList(); for(ParquetRowGroupScan.RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){ @@ -68,7 +70,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan throw new ExecutionSetupException(e1); } } - System.out.println( "Total time in method: " + ((float) (System.nanoTime() - tA) / 1e9)); + logger.debug("total time in ScanBatchCreator.getBatch: {} ms", watch.elapsed(TimeUnit.MILLISECONDS)); return new ScanBatch(context, readers.iterator()); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java index 9a33109..72c5f34 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/ErrorHelper.java @@ -35,8 +35,8 @@ public class ErrorHelper { if(message != null){ sb.append(message); } - - do{ + + while (true) { sb.append(" < "); sb.append(t.getClass().getSimpleName()); if(t.getMessage() != null){ @@ -44,7 +44,9 @@ public class ErrorHelper { sb.append(t.getMessage()); sb.append(" ]"); } - }while(t.getCause() != null && t.getCause() != t); + if (t.getCause() == null || t.getCause() == t) break; + t = t.getCause(); + } builder.setMessage(sb.toString()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java index e2a00f1..18ac294 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java @@ -13,10 +13,16 @@ import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.rpc.user.UserResultsListener; +import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.parquet.ParquetGroupScan; +import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -29,6 +35,7 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.CountDownLatch; import static junit.framework.Assert.assertNull; import static org.junit.Assert.assertEquals; @@ -38,6 +45,8 @@ public class TestParquetPhysicalPlan { //public String fileName = "/physical_test2.json"; public String fileName = "parquet_scan_union_screen_physical.json"; +// public String fileName = "parquet-sample.json"; + @Test @Ignore @@ -49,7 +58,51 @@ public class TestParquetPhysicalPlan { bit1.run(); client.connect(); List<QueryResultBatch> results = client.runQuery(UserProtos.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8)); - System.out.println(String.format("Got %d results", results.size())); + RecordBatchLoader loader = new RecordBatchLoader(bit1.getContext().getAllocator()); + for (QueryResultBatch b : results) { + System.out.println(String.format("Got %d results", b.getHeader().getRowCount())); + loader.load(b.getHeader().getDef(), b.getData()); + for (VectorWrapper vw : loader) { + System.out.println(vw.getValueVector().getField().getName()); + ValueVector vv = vw.getValueVector(); + for (int i = 0; i < vv.getAccessor().getValueCount(); i++) { + Object o = vv.getAccessor().getObject(i); + System.out.println(vv.getAccessor().getObject(i)); + } + } + } + client.close(); + } + } + + private class ParquetResultsListener implements UserResultsListener { + private CountDownLatch latch = new CountDownLatch(1); + @Override + public void submissionFailed(RpcException ex) { + logger.error("submission failed", ex); + latch.countDown(); + } + + @Override + public void resultArrived(QueryResultBatch result) { + System.out.printf("Result batch arrived. Number of records: %d", result.getHeader().getRowCount()); + if (result.getHeader().getIsLastChunk()) latch.countDown(); + } + + public void await() throws Exception { + latch.await(); + } + } + @Test + @Ignore + public void testParseParquetPhysicalPlanRemote() throws Exception { + DrillConfig config = DrillConfig.create(); + + try(DrillClient client = new DrillClient(config);){ + client.connect(); + ParquetResultsListener listener = new ParquetResultsListener(); + client.runQuery(UserProtos.QueryType.PHYSICAL, Resources.toString(Resources.getResource(fileName),Charsets.UTF_8), listener); + listener.await(); client.close(); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 1d91455..7a99c3f 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -48,6 +48,7 @@ import org.apache.drill.exec.store.parquet.TestFileGenerator.FieldInfo; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.ValueVector; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import parquet.bytes.BytesInput; @@ -68,6 +69,7 @@ public class ParquetRecordReaderTest { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReaderTest.class); private boolean VERBOSE_DEBUG = false; + private boolean checkValues = true; static final int numberRowGroups = 20; static final int recordsPerRowGroup = 300000; @@ -100,6 +102,20 @@ public class ParquetRecordReaderTest { testParquetFullEngineLocalText(planText, fileName, i, numberRowGroups, recordsPerRowGroup); } + @Test + @Ignore + public void testLocalDistributed() throws Exception { + String planName = "/parquet/parquet_scan_union_screen_physical.json"; + testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20, 300000); + } + + @Test + @Ignore + public void testRemoteDistributed() throws Exception { + String planName = "/parquet/parquet_scan_union_screen_physical.json"; + testParquetFullEngineRemote(planName, fileName, 1, 10, 30000); + } + private class ParquetResultListener implements UserResultsListener { private SettableFuture<Void> future = SettableFuture.create(); @@ -155,8 +171,12 @@ public class ParquetRecordReaderTest { if (VERBOSE_DEBUG){ System.out.print(vv.getAccessor().getObject(j) + ", " + (j % 25 == 0 ? "\n batch:" + batchCounter + " v:" + j + " - " : "")); } - assertField(vv, j, (TypeProtos.MinorType) currentField.type, - currentField.values[(int) (columnValCounter % 3)], (String) currentField.name + "/"); + if (checkValues) { + try { + assertField(vv, j, (TypeProtos.MinorType) currentField.type, + currentField.values[(int) (columnValCounter % 3)], (String) currentField.name + "/"); + } catch (AssertionError e) { submissionFailed(new RpcException(e)); } + } columnValCounter++; } if (VERBOSE_DEBUG){ @@ -197,7 +217,9 @@ public class ParquetRecordReaderTest { batchCounter++; if(result.getHeader().getIsLastChunk()){ for (String s : valuesChecked.keySet()) { + try { assertEquals("Record count incorrect for column: " + s, totalRecords, (long) valuesChecked.get(s)); + } catch (AssertionError e) { submissionFailed(new RpcException(e)); } } assert valuesChecked.keySet().size() > 0; @@ -222,11 +244,13 @@ public class ParquetRecordReaderTest { DrillConfig config = DrillConfig.create(); + checkValues = false; + try(DrillClient client = new DrillClient(config);){ client.connect(); RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator()); ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead); - client.runQuery(UserProtos.QueryType.LOGICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener); + client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile(plan), Charsets.UTF_8), resultListener); resultListener.get(); } @@ -259,6 +283,28 @@ public class ParquetRecordReaderTest { } + //use this method to submit physical plan + public void testParquetFullEngineLocalTextDistributed(String planName, String filename, int numberOfTimesRead /* specified in json plan */, int numberOfRowGroups, int recordsPerRowGroup) throws Exception{ + + RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + + checkValues = false; + + DrillConfig config = DrillConfig.create(); + + try(Drillbit bit1 = new Drillbit(config, serviceSet); DrillClient client = new DrillClient(config, serviceSet.getCoordinator());){ + bit1.run(); + client.connect(); + RecordBatchLoader batchLoader = new RecordBatchLoader(client.getAllocator()); + ParquetResultListener resultListener = new ParquetResultListener(recordsPerRowGroup, batchLoader, numberOfRowGroups, numberOfTimesRead); + Stopwatch watch = new Stopwatch().start(); + client.runQuery(UserProtos.QueryType.PHYSICAL, Files.toString(FileUtils.getResourceAsFile(planName), Charsets.UTF_8), resultListener); + resultListener.get(); + System.out.println(String.format("Took %d ms to run query", watch.elapsed(TimeUnit.MILLISECONDS))); + + } + + } public String pad(String value, int length) { return pad(value, length, " "); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/7edd3617/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json index f508d09..5efecaf 100644 --- a/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json +++ b/sandbox/prototype/exec/java-exec/src/test/resources/parquet/parquet_scan_union_screen_physical.json @@ -11,10 +11,7 @@ @id : 1, entries : [ { - path : "/tmp/testParquetFile_many_types_3" - }, - { - path : "/tmp/testParquetFile_many_types_3" + path : "/tmp/parquet_test_file_many_types" } ], storageengine:{
