This is an automated email from the ASF dual-hosted git repository. boaz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 18766950c640c6963ffd1c94224c4a984bedd3c1 Author: Arina Ielchiieva <[email protected]> AuthorDate: Wed May 23 16:17:11 2018 +0000 DRILL-6442: Adjust Hbase disk cost & row count estimation when filter push down is applied closes #1288 --- .../drill/exec/store/hbase/HBaseGroupScan.java | 64 ++++++++++----------- .../exec/store/hbase/TableStatsCalculator.java | 67 ++++++++++++---------- 2 files changed, 66 insertions(+), 65 deletions(-) diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 9eeba24..97c9a95 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -22,7 +22,9 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -63,20 +65,13 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; @JsonTypeName("hbase-scan") public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConstants { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseGroupScan.class); - private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = new Comparator<List<HBaseSubScanSpec>>() { - @Override - public int compare(List<HBaseSubScanSpec> list1, List<HBaseSubScanSpec> list2) { - return list1.size() - list2.size(); - } - }; + private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR = (list1, list2) -> list1.size() - list2.size(); private static final Comparator<List<HBaseSubScanSpec>> LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR); @@ -182,12 +177,12 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst public List<EndpointAffinity> getOperatorAffinity() { watch.reset(); watch.start(); - Map<String, DrillbitEndpoint> endpointMap = new HashMap<String, DrillbitEndpoint>(); + Map<String, DrillbitEndpoint> endpointMap = new HashMap<>(); for (DrillbitEndpoint ep : storagePlugin.getContext().getBits()) { endpointMap.put(ep.getAddress(), ep); } - Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<DrillbitEndpoint, EndpointAffinity>(); + Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>(); for (ServerName sn : regionsToScan.values()) { DrillbitEndpoint ep = endpointMap.get(sn.getHostname()); if (ep != null) { @@ -199,14 +194,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst } } } - logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS)/1000); - return Lists.newArrayList(affinityMap.values()); + logger.debug("Took {} µs to get operator affinity", watch.elapsed(TimeUnit.NANOSECONDS) / 1000); + return new ArrayList<>(affinityMap.values()); } - /** - * - * @param incomingEndpoints - */ @Override public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { watch.reset(); @@ -230,23 +221,23 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst /* * another map with endpoint (hostname => corresponding index list) in 'incomingEndpoints' list */ - Map<String, Queue<Integer>> endpointHostIndexListMap = Maps.newHashMap(); + Map<String, Queue<Integer>> endpointHostIndexListMap = new HashMap<>(); /* * Initialize these two maps */ for (int i = 0; i < numSlots; ++i) { - endpointFragmentMapping.put(i, new ArrayList<HBaseSubScanSpec>(maxPerEndpointSlot)); + endpointFragmentMapping.put(i, new ArrayList<>(maxPerEndpointSlot)); String hostname = incomingEndpoints.get(i).getAddress(); Queue<Integer> hostIndexQueue = endpointHostIndexListMap.get(hostname); if (hostIndexQueue == null) { - hostIndexQueue = Lists.newLinkedList(); + hostIndexQueue = new LinkedList<>(); endpointHostIndexListMap.put(hostname, hostIndexQueue); } hostIndexQueue.add(i); } - Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = Sets.newHashSet(regionsToScan.entrySet()); + Set<Entry<HRegionInfo, ServerName>> regionsToAssignSet = new HashSet<>(regionsToScan.entrySet()); /* * First, we assign regions which are hosted on region servers running on drillbit endpoints @@ -256,13 +247,13 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst /* * Test if there is a drillbit endpoint which is also an HBase RegionServer that hosts the current HBase region */ - Queue<Integer> endpointIndexlist = endpointHostIndexListMap.get(regionEntry.getValue().getHostname()); - if (endpointIndexlist != null) { - Integer slotIndex = endpointIndexlist.poll(); + Queue<Integer> endpointIndexList = endpointHostIndexListMap.get(regionEntry.getValue().getHostname()); + if (endpointIndexList != null) { + Integer slotIndex = endpointIndexList.poll(); List<HBaseSubScanSpec> endpointSlotScanList = endpointFragmentMapping.get(slotIndex); endpointSlotScanList.add(regionInfoToSubScanSpec(regionEntry.getKey())); // add to the tail of the slot list, to add more later in round robin fashion - endpointIndexlist.offer(slotIndex); + endpointIndexList.offer(slotIndex); // this region has been assigned regionsIterator.remove(); } @@ -271,8 +262,8 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst /* * Build priority queues of slots, with ones which has tasks lesser than 'minPerEndpointSlot' and another which have more. */ - PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR); - PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<List<HBaseSubScanSpec>>(numSlots, LIST_SIZE_COMPARATOR_REV); + PriorityQueue<List<HBaseSubScanSpec>> minHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR); + PriorityQueue<List<HBaseSubScanSpec>> maxHeap = new PriorityQueue<>(numSlots, LIST_SIZE_COMPARATOR_REV); for(List<HBaseSubScanSpec> listOfScan : endpointFragmentMapping.values()) { if (listOfScan.size() < minPerEndpointSlot) { minHeap.offer(listOfScan); @@ -310,12 +301,11 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst } /* no slot should be empty at this point */ - assert (minHeap.peek() == null || minHeap.peek().size() > 0) : String.format( - "Unable to assign tasks to some endpoints.\nEndpoints: {}.\nAssignment Map: {}.", - incomingEndpoints, endpointFragmentMapping.toString()); + assert (minHeap.peek() == null || minHeap.peek().size() > 0) : + String.format("Unable to assign tasks to some endpoints.\nEndpoints: %s.\nAssignment Map: %s.", incomingEndpoints, endpointFragmentMapping.toString()); logger.debug("Built assignment map in {} µs.\nEndpoints: {}.\nAssignment Map: {}", - watch.elapsed(TimeUnit.NANOSECONDS)/1000, incomingEndpoints, endpointFragmentMapping.toString()); + watch.elapsed(TimeUnit.NANOSECONDS) / 1000, incomingEndpoints, endpointFragmentMapping.toString()); } private HBaseSubScanSpec regionInfoToSubScanSpec(HRegionInfo ri) { @@ -347,9 +337,15 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst @Override public ScanStats getScanStats() { - long rowCount = (long) ((scanSizeInBytes / statsCalculator.getAvgRowSizeInBytes()) * (hbaseScanSpec.getFilter() != null ? 0.5 : 1)); - // the following calculation is not precise since 'columns' could specify CFs while getColsPerRow() returns the number of qualifier. - float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 1 : columns.size()/statsCalculator.getColsPerRow()); + long rowCount = scanSizeInBytes / statsCalculator.getAvgRowSizeInBytes(); + // the following calculation is not precise since 'columns' could specify CFs while getColsPerRow() returns the number of qualifier + float diskCost = scanSizeInBytes * ((columns == null || columns.isEmpty()) ? 1 : columns.size() / statsCalculator.getColsPerRow()); + // if filter push down is used, reduce estimated row count and disk cost by half to ensure plan cost will be less then without it + if (hbaseScanSpec.getFilter() != null) { + rowCount = (long) (rowCount * 0.5); + // if during sampling we found out exact row count, no need to reduce number of rows + diskCost = statsCalculator.usedDefaultRowCount() ? diskCost * 0.5F : diskCost; + } return new ScanStats(GroupScanProperty.NO_EXACT_ROW_COUNT, rowCount, 1, diskCost); } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java index 379fb7c..b435fbd 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/TableStatsCalculator.java @@ -48,36 +48,36 @@ import org.apache.hadoop.hbase.util.Bytes; * Computes size of each region for given table. */ public class TableStatsCalculator { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TableStatsCalculator.class); - public static final long DEFAULT_ROW_COUNT = 1024L * 1024L; + public static final long DEFAULT_ROW_COUNT = 1024L * 1024L; // 1 million rows private static final String DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT = "drill.exec.hbase.scan.samplerows.count"; private static final int DEFAULT_SAMPLE_SIZE = 100; - /** - * Maps each region to its size in bytes. - */ + // Maps each region to its size in bytes. private Map<byte[], Long> sizeMap = null; private int avgRowSizeInBytes = 1; private int colsPerRow = 1; + private long estimatedRowCount = DEFAULT_ROW_COUNT; + /** * Computes size of each region for table. * - * @param conn - * @param hbaseScanSpec - * @param config - * @throws IOException + * @param connection connection to Hbase client + * @param hbaseScanSpec scan specification + * @param config drill configuration + * @param storageConfig Hbase storage configuration */ - public TableStatsCalculator(Connection conn, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException { + public TableStatsCalculator(Connection connection, HBaseScanSpec hbaseScanSpec, DrillConfig config, HBaseStoragePluginConfig storageConfig) throws IOException { TableName tableName = TableName.valueOf(hbaseScanSpec.getTableName()); - try (Admin admin = conn.getAdmin(); - Table table = conn.getTable(tableName); - RegionLocator locator = conn.getRegionLocator(tableName)) { + try (Admin admin = connection.getAdmin(); + Table table = connection.getTable(tableName); + RegionLocator locator = connection.getRegionLocator(tableName)) { int rowsToSample = rowsToSample(config); if (rowsToSample > 0) { Scan scan = new Scan(hbaseScanSpec.getStartRow(), hbaseScanSpec.getStopRow()); @@ -100,22 +100,25 @@ public class TableStatsCalculator { } } if (rowCount > 0) { - avgRowSizeInBytes = (int) (rowSizeSum/rowCount); - colsPerRow = numColumnsSum/rowCount; + avgRowSizeInBytes = (int) (rowSizeSum / rowCount); + colsPerRow = numColumnsSum / rowCount; + // if during sampling we receive less rows than expected, then we can use this number instead of default + estimatedRowCount = rowCount == rowsToSample ? estimatedRowCount : rowCount; } + scanner.close(); } if (!enabled(storageConfig)) { - logger.info("Region size calculation disabled."); + logger.debug("Region size calculation is disabled."); return; } - logger.info("Calculating region sizes for table '{}'.", tableName.getNameAsString()); + logger.debug("Calculating region sizes for table '{}'.", tableName.getNameAsString()); - //get regions for table + // get regions for table List<HRegionLocation> tableRegionInfos = locator.getAllRegionLocations(); - Set<byte[]> tableRegions = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR); + Set<byte[]> tableRegions = new TreeSet<>(Bytes.BYTES_COMPARATOR); for (HRegionLocation regionInfo : tableRegionInfos) { tableRegions.add(regionInfo.getRegionInfo().getRegionName()); } @@ -124,17 +127,17 @@ public class TableStatsCalculator { try { clusterStatus = admin.getClusterStatus(); } catch (Exception e) { - logger.debug(e.getMessage()); + logger.debug(e.getMessage(), e); } finally { if (clusterStatus == null) { return; } } - sizeMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR); + sizeMap = new TreeMap<>(Bytes.BYTES_COMPARATOR); Collection<ServerName> servers = clusterStatus.getServers(); - //iterate all cluster regions, filter regions from our table and compute their size + // iterate all cluster regions, filter regions from our table and compute their size for (ServerName serverName : servers) { ServerLoad serverLoad = clusterStatus.getLoad(serverName); @@ -143,14 +146,12 @@ public class TableStatsCalculator { if (tableRegions.contains(regionId)) { long regionSizeMB = regionLoad.getMemStoreSizeMB() + regionLoad.getStorefileSizeMB(); - sizeMap.put(regionId, (regionSizeMB > 0 ? regionSizeMB : 1) * (1024*1024)); - if (logger.isDebugEnabled()) { - logger.debug("Region " + regionLoad.getNameAsString() + " has size " + regionSizeMB + "MB"); - } + sizeMap.put(regionId, (regionSizeMB > 0 ? regionSizeMB : 1) * estimatedRowCount); + logger.debug("Region {} has size {} MB.", regionLoad.getNameAsString(), regionSizeMB); } } } - logger.debug("Region sizes calculated"); + logger.debug("Region sizes calculated."); } } @@ -160,8 +161,8 @@ public class TableStatsCalculator { } private int rowsToSample(DrillConfig config) { - return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) - ? config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : DEFAULT_SAMPLE_SIZE; + return config.hasPath(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) ? + config.getInt(DRILL_EXEC_HBASE_SCAN_SAMPLE_ROWS_COUNT) : DEFAULT_SAMPLE_SIZE; } /** @@ -169,11 +170,11 @@ public class TableStatsCalculator { */ public long getRegionSizeInBytes(byte[] regionId) { if (sizeMap == null) { - return (long) avgRowSizeInBytes * DEFAULT_ROW_COUNT; // 1 million rows + return (long) avgRowSizeInBytes * estimatedRowCount; } else { Long size = sizeMap.get(regionId); if (size == null) { - logger.debug("Unknown region:" + Arrays.toString(regionId)); + logger.debug("Unknown region: {}.", Arrays.toString(regionId)); return 0; } else { return size; @@ -189,4 +190,8 @@ public class TableStatsCalculator { return colsPerRow; } + public boolean usedDefaultRowCount() { + return estimatedRowCount == DEFAULT_ROW_COUNT; + } + } -- To stop receiving notification emails like this one, please contact [email protected].
