Repository: incubator-trafodion Updated Branches: refs/heads/master cae7aba40 -> 536c51d54
[TRAFODION-1421] Implement parallel scanner primitive new CQD: hbase_dop_parallel_scanner - default 0.0 = disabled - from 0.x to 1.0 -> the DOP will be dynamically assessed as a percentage of region to scan. If 10 region, and cqd is .3 (30%), then 3 threads will be used... - from 2.0 and above, will take ceiling, and use it as fixed number of threads. Advantage of parallel scanner parallelism over ESP is on resource consumption. Parallel scanner is just creating threads, while ESP are full proceses wth high memory consumption. For now, this feature is just a primitive that can be manually exercised. The next obvious step is to make the compiler aware of it and pick it over ESP DOP when appropriate... Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/a7f9a628 Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/a7f9a628 Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/a7f9a628 Branch: refs/heads/master Commit: a7f9a628179fcd4077a566f558e653b1380eabc6 Parents: b7ecf18 Author: Eric Owhadi <[email protected]> Authored: Mon Apr 11 16:59:23 2016 +0000 Committer: Eric Owhadi <[email protected]> Committed: Mon Apr 11 16:59:23 2016 +0000 ---------------------------------------------------------------------- .../hbase/client/ClientScanner98.java.tmpl | 490 +++++++++++++++++++ .../client/TrafParallelClientScanner.java.tmpl | 325 ++++++++++++ .../hbase/client/transactional/RMInterface.java | 4 +- .../transactional/SsccTransactionalTable.java | 8 +- .../transactional/TransactionalTable.java | 8 +- .../transactional/TransactionalTableClient.java | 3 +- core/sql/comexe/ComTdbHbaseAccess.h | 4 + core/sql/executor/ExHbaseIUD.cpp | 6 +- core/sql/executor/ExHbaseSelect.cpp | 3 + core/sql/executor/HBaseClient_JNI.cpp | 5 +- core/sql/executor/HBaseClient_JNI.h | 1 + core/sql/exp/ExpHbaseInterface.cpp | 4 +- core/sql/exp/ExpHbaseInterface.h | 2 + core/sql/generator/GenExplain.cpp | 7 + core/sql/generator/GenRelScan.cpp | 1 + core/sql/generator/Generator.cpp | 3 + core/sql/generator/Generator.h | 2 +- core/sql/regress/executor/EXPECTED140 | 435 +++++++++++++++- core/sql/regress/executor/TEST140 | 28 +- core/sql/sqlcomp/DefaultConstants.h | 4 + core/sql/sqlcomp/nadefaults.cpp | 2 +- .../java/org/trafodion/sql/HTableClient.java | 7 +- 22 files changed, 1318 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl new file mode 100644 index 0000000..81cec3f --- /dev/null +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl @@ -0,0 +1,490 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.LinkedList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Implements the scanner interface for the HBase client. + * If there are multiple regions in a table, this scanner will iterate + * through them all. + */ [email protected] [email protected] +public class ClientScanner98 extends AbstractClientScanner { + private final Log LOG = LogFactory.getLog(this.getClass()); + protected Scan scan; + protected boolean closed = false; + // Current region scanner is against. Gets cleared if current region goes + // wonky: e.g. if it splits on us. + protected HRegionInfo currentRegion = null; + protected ScannerCallable callable = null; + protected final LinkedList<Result> cache = new LinkedList<Result>(); + protected final int caching; + protected long lastNext; + // Keep lastResult returned successfully in case we have to reset scanner. + protected Result lastResult = null; + protected final long maxScannerResultSize; + private final HConnection connection; + private final TableName tableName; + protected final int scannerTimeout; + protected boolean scanMetricsPublished = false; + protected RpcRetryingCaller<Result []> caller; + protected RpcControllerFactory rpcControllerFactory; + + /** + * Create a new ClientScanner for the specified table. An HConnection will be + * retrieved using the passed Configuration. + * Note that the passed {@link Scan}'s start row maybe changed changed. + * + * @param conf The {@link Configuration} to use. + * @param scan {@link Scan} to use in this scanner + * @param tableName The table that we wish to scan + * @throws IOException + */ + @Deprecated + public ClientScanner98(final Configuration conf, final Scan scan, + final TableName tableName) throws IOException { + this(conf, scan, tableName, HConnectionManager.getConnection(conf)); + } + + /** + * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)} + */ + @Deprecated + public ClientScanner98(final Configuration conf, final Scan scan, + final byte [] tableName) throws IOException { + this(conf, scan, TableName.valueOf(tableName)); + } + + + /** + * Create a new ClientScanner for the specified table + * Note that the passed {@link Scan}'s start row maybe changed changed. + * + * @param conf The {@link Configuration} to use. + * @param scan {@link Scan} to use in this scanner + * @param tableName The table that we wish to scan + * @param connection Connection identifying the cluster + * @throws IOException + */ + public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName, + HConnection connection) throws IOException { + this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf), + RpcControllerFactory.instantiate(conf)); + } + + /** + * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)} + */ + @Deprecated + public ClientScanner98(final Configuration conf, final Scan scan, final byte [] tableName, + HConnection connection) throws IOException { + this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf), + RpcControllerFactory.instantiate(conf)); + } + + /** + * @deprecated Use + * {@link #ClientScanner(Configuration, Scan, TableName, HConnection, + * RpcRetryingCallerFactory, RpcControllerFactory)} + * instead + */ + @Deprecated + public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName, + HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException { + this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf)); + } + + /** + * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s start + * row maybe changed changed. + * @param conf The {@link Configuration} to use. + * @param scan {@link Scan} to use in this scanner + * @param tableName The table that we wish to scan + * @param connection Connection identifying the cluster + * @throws IOException + */ + public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName, + HConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory) throws IOException { + if (LOG.isTraceEnabled()) { + LOG.trace("Scan table=" + tableName + + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); + } + this.scan = scan; + this.tableName = tableName; + this.lastNext = System.currentTimeMillis(); + this.connection = connection; + if (scan.getMaxResultSize() > 0) { + this.maxScannerResultSize = scan.getMaxResultSize(); + } else { + this.maxScannerResultSize = conf.getLong( + HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); + } + this.scannerTimeout = HBaseConfiguration.getInt(conf, + HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + + // check if application wants to collect scan metrics + initScanMetrics(scan); + + // Use the caching from the Scan. If not set, use the default cache setting for this table. + if (this.scan.getCaching() > 0) { + this.caching = this.scan.getCaching(); + } else { + this.caching = conf.getInt( + HConstants.HBASE_CLIENT_SCANNER_CACHING, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING); + } + + this.caller = rpcFactory.<Result[]> newCaller(); + this.rpcControllerFactory = controllerFactory; + + initializeScannerInConstruction(); + } + + protected void initializeScannerInConstruction() throws IOException{ + // initialize the scanner + nextScanner(this.caching, false); + } + + protected HConnection getConnection() { + return this.connection; + } + + /** + * @return Table name + * @deprecated Since 0.96.0; use {@link #getTable()} + */ + @Deprecated + protected byte [] getTableName() { + return this.tableName.getName(); + } + + protected TableName getTable() { + return this.tableName; + } + + protected Scan getScan() { + return scan; + } + + protected long getTimestamp() { + return lastNext; + } + + // returns true if the passed region endKey + protected boolean checkScanStopRow(final byte [] endKey) { + if (this.scan.getStopRow().length > 0) { + // there is a stop row, check to see if we are past it. + byte [] stopRow = scan.getStopRow(); + int cmp = Bytes.compareTo(stopRow, 0, stopRow.length, + endKey, 0, endKey.length); + if (cmp <= 0) { + // stopRow <= endKey (endKey is equals to or larger than stopRow) + // This is a stop. + return true; + } + } + return false; //unlikely. + } + + /* + * Gets a scanner for the next region. If this.currentRegion != null, then + * we will move to the endrow of this.currentRegion. Else we will get + * scanner at the scan.getStartRow(). We will go no further, just tidy + * up outstanding scanners, if <code>currentRegion != null</code> and + * <code>done</code> is true. + * @param nbRows + * @param done Server-side says we're done scanning. + */ + protected boolean nextScanner(int nbRows, final boolean done) + throws IOException { + // Close the previous scanner if it's open + if (this.callable != null) { + this.callable.setClose(); + this.caller.callWithRetries(callable,scannerTimeout); + this.callable = null; + } + + // Where to start the next scanner + byte [] localStartKey; + + // if we're at end of table, close and return false to stop iterating + if (this.currentRegion != null) { + byte [] endKey = this.currentRegion.getEndKey(); + if (endKey == null || + Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || + checkScanStopRow(endKey) || + done) { + close(); + if (LOG.isTraceEnabled()) { + LOG.trace("Finished " + this.currentRegion); + } + return false; + } + localStartKey = endKey; + if (LOG.isTraceEnabled()) { + LOG.trace("Finished " + this.currentRegion); + } + } else { + localStartKey = this.scan.getStartRow(); + } + + if (LOG.isDebugEnabled() && this.currentRegion != null) { + // Only worth logging if NOT first region in scan. + LOG.debug("Advancing internal scanner to startKey at '" + + Bytes.toStringBinary(localStartKey) + "'"); + } + try { + callable = getScannerCallable(localStartKey, nbRows); + // Open a scanner on the region server starting at the + // beginning of the region + this.caller.callWithRetries(callable,scannerTimeout); + this.currentRegion = callable.getHRegionInfo(); + if (this.scanMetrics != null) { + this.scanMetrics.countOfRegions.incrementAndGet(); + } + } catch (IOException e) { + close(); + throw e; + } + return true; + } + + @InterfaceAudience.Private + protected ScannerCallable getScannerCallable(byte [] localStartKey, + int nbRows) { + scan.setStartRow(localStartKey); + ScannerCallable s = + new ScannerCallable((ClusterConnection)getConnection(), getTable(), scan, this.scanMetrics, + this.rpcControllerFactory); + s.setCaching(nbRows); + return s; + } + + /** + * Publish the scan metrics. For now, we use scan.setAttribute to pass the metrics back to the + * application or TableInputFormat.Later, we could push it to other systems. We don't use metrics + * framework because it doesn't support multi-instances of the same metrics on the same machine; + * for scan/map reduce scenarios, we will have multiple scans running at the same time. + * + * By default, scan metrics are disabled; if the application wants to collect them, this behavior + * can be turned on by calling calling: + * + * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, Bytes.toBytes(Boolean.TRUE)) + */ + protected void writeScanMetrics() { + if (this.scanMetrics == null || scanMetricsPublished) { + return; + } + MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics); + scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray()); + scanMetricsPublished = true; + } + + @Override + public Result next() throws IOException { + // If the scanner is closed and there's nothing left in the cache, next is a no-op. + if (cache.size() == 0 && this.closed) { + return null; + } + if (cache.size() == 0) { + Result [] values = null; + long remainingResultSize = maxScannerResultSize; + int countdown = this.caching; + // We need to reset it if it's a new callable that was created + // with a countdown in nextScanner + callable.setCaching(this.caching); + // This flag is set when we want to skip the result returned. We do + // this when we reset scanner because it split under us. + boolean skipFirst = false; + boolean retryAfterOutOfOrderException = true; + do { + try { + if (skipFirst) { + // Skip only the first row (which was the last row of the last + // already-processed batch). + callable.setCaching(1); + values = this.caller.callWithRetries(callable,scannerTimeout); + callable.setCaching(this.caching); + skipFirst = false; + } + // Server returns a null values if scanning is to stop. Else, + // returns an empty array if scanning is to go on and we've just + // exhausted current region. + values = this.caller.callWithRetries(callable,scannerTimeout); + if (skipFirst && values != null && values.length == 1) { + skipFirst = false; // Already skipped, unset it before scanning again + values = this.caller.callWithRetries(callable,scannerTimeout); + } + retryAfterOutOfOrderException = true; + } catch (DoNotRetryIOException e) { + // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us + // to reset the scanner and come back in again. + if (e instanceof UnknownScannerException) { + long timeout = lastNext + scannerTimeout; + // If we are over the timeout, throw this exception to the client wrapped in + // a ScannerTimeoutException. Else, it's because the region moved and we used the old + // id against the new region server; reset the scanner. + //if (timeout < System.currentTimeMillis()) { + //long elapsed = System.currentTimeMillis() - lastNext; + //ScannerTimeoutException ex = new ScannerTimeoutException( + // elapsed + "ms passed since the last invocation, " + + // "timeout is currently set to " + scannerTimeout); + //ex.initCause(e); + //throw ex; + //} + } else { + // If exception is any but the list below throw it back to the client; else setup + // the scanner and retry. + Throwable cause = e.getCause(); + if ((cause != null && cause instanceof NotServingRegionException) || + (cause != null && cause instanceof RegionServerStoppedException) || + e instanceof OutOfOrderScannerNextException) { + // Pass + // It is easier writing the if loop test as list of what is allowed rather than + // as a list of what is not allowed... so if in here, it means we do not throw. + } else { + throw e; + } + } + // Else, its signal from depths of ScannerCallable that we need to reset the scanner. + if (this.lastResult != null) { + // The region has moved. We need to open a brand new scanner at + // the new location. + // Reset the startRow to the row we've seen last so that the new + // scanner starts at the correct row. Otherwise we may see previously + // returned rows again. + // (ScannerCallable by now has "relocated" the correct region) + this.scan.setStartRow(this.lastResult.getRow()); + + // Skip first row returned. We already let it out on previous + // invocation. + skipFirst = true; + } + if (e instanceof OutOfOrderScannerNextException) { + if (retryAfterOutOfOrderException) { + retryAfterOutOfOrderException = false; + } else { + // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? + throw new DoNotRetryIOException("Failed after retry of " + + "OutOfOrderScannerNextException: was there a rpc timeout?", e); + } + } + // Clear region. + this.currentRegion = null; + // Set this to zero so we don't try and do an rpc and close on remote server when + // the exception we got was UnknownScanner or the Server is going down. + callable = null; + // This continue will take us to while at end of loop where we will set up new scanner. + continue; + } + long currentTime = System.currentTimeMillis(); + if (this.scanMetrics != null ) { + this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext); + } + lastNext = currentTime; + if (values != null && values.length > 0) { + for (Result rs : values) { + cache.add(rs); + for (Cell kv : rs.rawCells()) { + // TODO make method in Cell or CellUtil + remainingResultSize -= KeyValueUtil.ensureKeyValue(kv).heapSize(); + } + countdown--; + this.lastResult = rs; + } + } + // Values == null means server-side filter has determined we must STOP + } while (remainingResultSize > 0 && countdown > 0 && nextScanner(countdown, values == null)); + } + + if (cache.size() > 0) { + return cache.poll(); + } + + // if we exhausted this scanner before calling close, write out the scan metrics + writeScanMetrics(); + return null; + } + + @Override + public void close() { + if (!scanMetricsPublished) writeScanMetrics(); + if (callable != null) { + callable.setClose(); + try { + this.caller.callWithRetries(callable,scannerTimeout); + } catch (UnknownScannerException e) { + // We used to catch this error, interpret, and rethrow. However, we + // have since decided that it's not nice for a scanner's close to + // throw exceptions. Chances are it was just due to lease time out. + } catch (IOException e) { + /* An exception other than UnknownScanner is unexpected. */ + LOG.warn("scanner failed to close. Exception follows: " + e); + } + callable = null; + } + closed = true; + } +#ifndef CDH1.0 + @Override + public boolean renewLease() { + if (callable != null) { + // do not return any rows, do not advance the scanner + callable.setCaching(0); + try { + this.caller.callWithoutRetries(callable, this.scannerTimeout); + } catch (Exception e) { + return false; + } finally { + callable.setCaching(this.caching); + } + return true; + } + return false; + } +#endif + +} http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/TrafParallelClientScanner.java.tmpl ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/TrafParallelClientScanner.java.tmpl b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/TrafParallelClientScanner.java.tmpl new file mode 100644 index 0000000..50842fc --- /dev/null +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/TrafParallelClientScanner.java.tmpl @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.HBaseConfiguration; +//import org.apache.hadoop.hbase.client.ClusterConnection;//for HBase 1.x +import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; + +/** + * Implements the scanner interface for the HBase client. + * If there are multiple regions in a table, this scanner will iterate + * through them all in parallel. + */ +public class TrafParallelClientScanner extends AbstractClientScanner implements Closeable { + private final Log LOG = LogFactory.getLog(this.getClass()); + + static private Configuration config = HBaseConfiguration.create(); + + // special marker to indicate when a scanning task has finished + protected static final DoneMarker MARKER = new DoneMarker(); + + // the single reader object to use + protected ResultReader reader; + + // the thread pool to be used + protected ExecutorService pool; + + // should store hbase.client.replicaCallTimeout.scan (HBase 1.x) + //static private int replicaCallTimeoutMicroSecondScan; + + // static { + // replicaCallTimeoutMicroSecondScan = config.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // duplicated from TableConfiguration since it cannot be instanciated by lack of public constructor + // } + + /** + * If a caller has predetermined split points this constructor can be used. + * One thread per split will be used to execute the scan. + * @param connection The cluster connection to use + * @param scans The {@link Scan} objects defining the split points + * @param tableName The table to use + * @param bufferSize How many results to buffer + * (should be larger than the individual scans' caching setting) + * @throws IOException + */ + public TrafParallelClientScanner(final HConnection connection, final Collection<Scan> scans, + final TableName tableName, final int bufferSize) throws IOException { + createDefaultPool(scans.size()); + SingleReaderMultiWriterQueue rw = new SingleReaderMultiWriterQueue(bufferSize, scans.size()); + this.reader = rw; + for (Scan s : scans) { + pool.submit(new ScanTask(s, connection, tableName, rw,0)); + } + } + + /** + * Execute a single scan across multiple threads. Results are returned when they are available, + * NOT in rowkey order. + * @param connection The cluster connection to use + * @param scan The scan to execute + * @param tableName The table to use + * @param parallelScaling The scaling factor to use. if between 0 and 1, percent of regions run in parallel, if > 1 fixed number of thread run in parallel + * @throws IOException + */ + public TrafParallelClientScanner(final HConnection connection, final Scan scan, + final TableName tableName, final float parallelScaling) throws IOException { + if (parallelScaling <= 0) + throw new IllegalArgumentException("scalingFactor must > 0"); + + // get the region boundaries. null ES is fine, we're just using the table to lookup the region cache + HTable t = new HTable(tableName, connection, null); + List<HRegionLocation> locs; + try { + locs = t.getRegionsInRange(scan.getStartRow(), scan.getStopRow()); + } finally { + t.close(); + } + LOG.debug("Found "+locs.size()+" region(s)."); + if (locs.size() == 0) return; + + Map<HRegionLocation, Queue<Scan>> tasks = new HashMap<HRegionLocation, Queue<Scan>>(locs.size()); + int i=0; + // organize region locations by region server + for (HRegionLocation loc : locs) { + Scan s = new Scan(scan); + s.setStartRow(i==0?scan.getStartRow() : loc.getRegionInfo().getStartKey()); + i++; + s.setStopRow(i==locs.size()?scan.getStopRow() : loc.getRegionInfo().getEndKey()); + addToMapOfQueues(tasks, loc, s); + } + + int threads = parallelScaling>1.0 ?(int)Math.ceil(parallelScaling) : (int)Math.ceil(locs.size() * parallelScaling); + + createDefaultPool(threads); + SingleReaderMultiWriterQueue rw = new SingleReaderMultiWriterQueue(Math.max(scan.getCaching() * threads, threads), locs.size()); + this.reader = rw; + + LOG.debug("Scheduling "+threads+" thread(s)."); + // round robin among region servers + int taskNb=1; + while(!tasks.isEmpty()) { + for (Iterator<Map.Entry<HRegionLocation, Queue<Scan>>> it = tasks.entrySet().iterator(); it.hasNext();) { + Scan next = it.next().getValue().poll(); + if (next == null) { + it.remove(); + } else { + pool.submit(new ScanTask(next, connection, tableName, rw, taskNb)); + taskNb++; + } + } + } + } + + private <K, V> Queue<V> addToMapOfQueues(Map<K, Queue<V>> map, K key, V value) { + Queue<V> values = map.get(key); + if (values == null) { + values = new ArrayDeque<V>(); + map.put(key, values); + } + values.add(value); + return values; + } + + protected void createDefaultPool(int threads) { + pool = new ThreadPoolExecutor(threads, threads, + 60, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + Threads.newDaemonThreadFactory("traf-parallel-scan")); + ((ThreadPoolExecutor)pool).prestartAllCoreThreads(); + } + + /* actual scanner methods */ + @Override + public Result next() throws IOException { + try { + return reader.take(); + } catch (IOException iox) { + // kill other threads + close(); + // rethrow + throw iox; + } + } + + /** + * close the parallel scanner, callers are strongly encouraged to call this method + * doesn't wait until the threadpool actually closes + */ + @Override + public void close() { + // interrupt all running threads, don't wait for completion + pool.shutdownNow(); + } +#ifndef CDH1.0 + @Override + public boolean renewLease() {return false;}//fake, never needed. +#endif + ///// Helper classes and interfaces ///// + + // reader interface + private static interface ResultReader { + Result take() throws IOException; + } + // writer interface + private static interface ResultWriter { + // write a new result + void put(Result e) throws InterruptedException; + // the writer encountered an exception pass on to reader + void exception(Exception x); + // a writer thread is done, must be called by each thread + // that is writing to this ResultWriter + void done(); + } + + // a single reader, single writer queue, that reads Results as soon as they become available + private static class SingleReaderMultiWriterQueue implements ResultReader, ResultWriter { + private BlockingDeque<Result> queue; + private int taskCount; + public SingleReaderMultiWriterQueue(int capacity, int taskCount) { + queue = new LinkedBlockingDeque<Result>(capacity); + this.taskCount = taskCount; + } + + // writer -- + @Override + public void put(Result e) throws InterruptedException { + queue.put(e); + } + + @Override + public void exception(Exception x) { + try { + queue.put(new DoneMarker(x)); + } catch (InterruptedException ix) { + Thread.currentThread().interrupt(); + } + } + + @Override + public void done() { + try { + queue.put(MARKER); + } catch (InterruptedException ix) { + Thread.currentThread().interrupt(); + } + } + + // reader -- + // pop the next Result as it becomes available + @Override + public Result take() throws IOException { + while(true) { + Result r; + if (taskCount > 0) { + try { + // block if there are still tasks running + r = queue.take(); + } catch (InterruptedException x) { + throw new IOException(x); + } + if (r instanceof DoneMarker) { + ((DoneMarker)r).rethrow(); + taskCount--; + continue; + } + } else { + r = queue.poll(); + } + return r; + } + } + } + + /* marker class, also used to pass Exception along */ + protected static class DoneMarker extends Result { + private Exception x = null; + public DoneMarker() {} + + public DoneMarker(Exception x) { + this.x = x; + } + public void rethrow() throws IOException { + if (x != null) + throw x instanceof IOException ? (IOException)x : new IOException(x); + } + } + + /* Task executing a scan request */ + protected class ScanTask implements Runnable { + private Scan s; + private ResultWriter writer; + private HConnection connection; + private TableName tableName; + private int taskNb; + public ScanTask(Scan s, HConnection connection, TableName tableName, ResultWriter writer, int taskNb) { + this.s = s; + this.connection = connection; + this.tableName = tableName; + this.writer = writer; + this.taskNb = taskNb; + } + + @Override + public void run() { + Thread current = Thread.currentThread(); + try { + // for HBase 1.x + //ClientScanner98 scanner = new ClientScanner98(connection.getConfiguration(), s, tableName, (ClusterConnection)connection, + // ((ClusterConnection)connection).getNewRpcRetryingCallerFactory(config), + // RpcControllerFactory.instantiate(config)); +// ClientScanner scanner = new ClientScanner(connection.getConfiguration(), s, tableName, connection); + ClientScanner98 scanner = new ClientScanner98(config, s, tableName, connection); + + Result s; + while((s = scanner.next()) != null && !current.isInterrupted()) { + writer.put(s); + } + scanner.close(); + writer.done(); + } catch (Exception x) { + // record any exceptions encountered + writer.exception(x); + + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java index b7481b9..a0e92ff 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java @@ -423,9 +423,9 @@ public class RMInterface { { return ttable.getTableName(); } - public ResultScanner getScanner(Scan scan) throws IOException + public ResultScanner getScanner(Scan scan, float dopParallelScanner) throws IOException { - return ttable.getScanner(scan); + return ttable.getScanner(scan, dopParallelScanner); } public Result get(Get g) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java index 40b2eac..31e6d1b 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TrafParallelClientScanner; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; @@ -732,9 +733,12 @@ if (LOG.isTraceEnabled()) LOG.trace("checkAndPut, seting request startid: " + tr { return super.getTableName(); } - public ResultScanner getScanner(Scan scan) throws IOException + public ResultScanner getScanner(Scan scan, float DOPparallelScanner) throws IOException { - return super.getScanner(scan); + if (scan.isSmall() || DOPparallelScanner == 0) + return super.getScanner(scan); + else + return new TrafParallelClientScanner(this.connection, scan, getName(), DOPparallelScanner); } public Result get(Get g) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java index 609d9b6..b295171 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.TrafParallelClientScanner; import org.apache.hadoop.hbase.client.coprocessor.Batch; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos; import org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRequest; @@ -806,9 +807,12 @@ public HRegionLocation getRegionLocation(byte[] row, boolean f) { return super.getTableName(); } - public ResultScanner getScanner(Scan scan) throws IOException + public ResultScanner getScanner(Scan scan, float DOPparallelScanner) throws IOException { - return super.getScanner(scan); + if (scan.isSmall() || DOPparallelScanner == 0) + return super.getScanner(scan); + else + return new TrafParallelClientScanner(this.connection, scan, getName(), DOPparallelScanner); } public Result get(Get g) throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java ---------------------------------------------------------------------- diff --git a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java index 187a15f..76e61d6 100644 --- a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java +++ b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java @@ -136,9 +136,8 @@ public interface TransactionalTableClient { byte[] getTableName(); - ResultScanner getScanner(Scan scan) throws IOException; + ResultScanner getScanner(Scan scan, float DOPparallelScanner) throws IOException; Result get(Get g) throws IOException; - Result[] get( List<Get> g) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/comexe/ComTdbHbaseAccess.h ---------------------------------------------------------------------- diff --git a/core/sql/comexe/ComTdbHbaseAccess.h b/core/sql/comexe/ComTdbHbaseAccess.h index c38f2f4..c785a8d 100644 --- a/core/sql/comexe/ComTdbHbaseAccess.h +++ b/core/sql/comexe/ComTdbHbaseAccess.h @@ -288,6 +288,9 @@ public: void setMaxNumRowsPerHbaseBlock(UInt32 n) { maxNumRowsPerHBaseBlock_ = n;} UInt32 maxNumRowsPerHbaseBlock() { return maxNumRowsPerHBaseBlock_; } + void setDopParallelScanner(Float32 f) { dopParallelScanner_ = f;} + Float32 dopParallelScanner() { return dopParallelScanner_; } + private: enum { @@ -301,6 +304,7 @@ public: UInt32 flags_; UInt32 numCacheRows_; UInt32 maxNumRowsPerHBaseBlock_; + Float32 dopParallelScanner_; }; // --------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/executor/ExHbaseIUD.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHbaseIUD.cpp b/core/sql/executor/ExHbaseIUD.cpp index 262505b..bf3103a 100644 --- a/core/sql/executor/ExHbaseIUD.cpp +++ b/core/sql/executor/ExHbaseIUD.cpp @@ -2786,7 +2786,8 @@ ExWorkProcRetcode ExHbaseUMDtrafSubsetTaskTcb::work(short &rc) tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(), tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(), tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(), - FALSE, NULL, NULL, NULL); + FALSE, NULL, NULL, NULL, + tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner()); if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen")) step_ = HANDLE_ERROR; else @@ -3214,7 +3215,8 @@ ExWorkProcRetcode ExHbaseUMDnativeSubsetTaskTcb::work(short &rc) tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(), tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(), tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(), - FALSE, NULL, NULL, NULL); + FALSE, NULL, NULL, NULL, + tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner()); if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen")) step_ = HANDLE_ERROR; else http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/executor/ExHbaseSelect.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/ExHbaseSelect.cpp b/core/sql/executor/ExHbaseSelect.cpp index 2bd904b..f7d1ba7 100644 --- a/core/sql/executor/ExHbaseSelect.cpp +++ b/core/sql/executor/ExHbaseSelect.cpp @@ -90,6 +90,7 @@ ExWorkProcRetcode ExHbaseScanTaskTcb::work(short &rc) &tcb_->hbaseFilterOps_ : NULL), (tcb_->hbaseFilterValues_.entries() > 0 ? &tcb_->hbaseFilterValues_ : NULL), + tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(), tcb_->getSamplePercentage(), FALSE, 0, NULL, NULL, 0, (tcb_->hbaseAccessTdb().getHbaseAccessOptions() @@ -275,6 +276,7 @@ ExWorkProcRetcode ExHbaseScanRowwiseTaskTcb::work(short &rc) &tcb_->hbaseFilterOps_ : NULL), (tcb_->hbaseFilterValues_.entries() > 0 ? &tcb_->hbaseFilterValues_ : NULL), + tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(), tcb_->getSamplePercentage(), FALSE, 0, NULL, NULL, 0, (tcb_->hbaseAccessTdb().getHbaseAccessOptions() @@ -492,6 +494,7 @@ ExWorkProcRetcode ExHbaseScanSQTaskTcb::work(short &rc) &tcb_->hbaseFilterOps_ : NULL), (tcb_->hbaseFilterValues_.entries() > 0 ? &tcb_->hbaseFilterValues_ : NULL), + tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(), tcb_->getSamplePercentage(), tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getUseSnapshotScan(), tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getSnapshotScanTimeout(), http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/executor/HBaseClient_JNI.cpp ---------------------------------------------------------------------- diff --git a/core/sql/executor/HBaseClient_JNI.cpp b/core/sql/executor/HBaseClient_JNI.cpp index f0d50ab..1c25123 100644 --- a/core/sql/executor/HBaseClient_JNI.cpp +++ b/core/sql/executor/HBaseClient_JNI.cpp @@ -3585,7 +3585,7 @@ HTC_RetCode HTableClient_JNI::init() JavaMethods_[JM_GET_ERROR ].jm_name = "getLastError"; JavaMethods_[JM_GET_ERROR ].jm_signature = "()Ljava/lang/String;"; JavaMethods_[JM_SCAN_OPEN ].jm_name = "startScan"; - JavaMethods_[JM_SCAN_OPEN ].jm_signature = "(J[B[B[Ljava/lang/Object;JZZI[Ljava/lang/Object;[Ljava/lang/Object;[Ljava/lang/Object;FZZILjava/lang/String;Ljava/lang/String;II)Z"; + JavaMethods_[JM_SCAN_OPEN ].jm_signature = "(J[B[B[Ljava/lang/Object;JZZI[Ljava/lang/Object;[Ljava/lang/Object;[Ljava/lang/Object;FFZZILjava/lang/String;Ljava/lang/String;II)Z"; JavaMethods_[JM_DELETE ].jm_name = "deleteRow"; JavaMethods_[JM_DELETE ].jm_signature = "(J[B[Ljava/lang/Object;JZ)Z"; JavaMethods_[JM_COPROC_AGGR ].jm_name = "coProcAggr"; @@ -3631,6 +3631,7 @@ HTC_RetCode HTableClient_JNI::startScan(Int64 transID, const Text& startRowID, const LIST(NAString) *inColNamesToFilter, const LIST(NAString) *inCompareOpList, const LIST(NAString) *inColValuesToCompare, + Float32 dopParallelScanner, Float32 samplePercent, NABoolean useSnapshotScan, Lng32 snapTimeout, @@ -3754,6 +3755,7 @@ HTC_RetCode HTableClient_JNI::startScan(Int64 transID, const Text& startRowID, return HTC_ERROR_SCANOPEN_PARAM; } } + jfloat j_dopParallelScanner = dopParallelScanner; jfloat j_smplPct = samplePercent; jboolean j_useSnapshotScan = useSnapshotScan; jint j_snapTimeout = snapTimeout; @@ -3786,6 +3788,7 @@ HTC_RetCode HTableClient_JNI::startScan(Int64 transID, const Text& startRowID, JavaMethods_[JM_SCAN_OPEN].methodID, j_tid, jba_startRowID, jba_stopRowID, j_cols, j_ts, j_cb, j_smallScanner, j_ncr, j_colnamestofilter, j_compareoplist, j_colvaluestocompare, + j_dopParallelScanner, j_smplPct, j_preFetch, j_useSnapshotScan, j_snapTimeout, js_snapName, js_tmp_loc, j_espNum, j_versions); http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/executor/HBaseClient_JNI.h ---------------------------------------------------------------------- diff --git a/core/sql/executor/HBaseClient_JNI.h b/core/sql/executor/HBaseClient_JNI.h index 9315024..373098c 100644 --- a/core/sql/executor/HBaseClient_JNI.h +++ b/core/sql/executor/HBaseClient_JNI.h @@ -249,6 +249,7 @@ public: const LIST(NAString) *inColNamesToFilter, const LIST(NAString) *inCompareOpList, const LIST(NAString) *inColValuesToCompare, + Float32 dopParallelScanner = 0.0f, Float32 samplePercent = -1.0f, NABoolean useSnapshotScan = FALSE, Lng32 snapTimeout = 0, http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/exp/ExpHbaseInterface.cpp ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpHbaseInterface.cpp b/core/sql/exp/ExpHbaseInterface.cpp index c7914d0..7fa7d53 100644 --- a/core/sql/exp/ExpHbaseInterface.cpp +++ b/core/sql/exp/ExpHbaseInterface.cpp @@ -629,13 +629,14 @@ Lng32 ExpHbaseInterface_JNI::scanOpen( const LIST(NAString) *inColNamesToFilter, const LIST(NAString) *inCompareOpList, const LIST(NAString) *inColValuesToCompare, + Float32 dopParallelScanner, Float32 samplePercent, NABoolean useSnapshotScan, Lng32 snapTimeout, char * snapName, char * tmpLoc, Lng32 espNum, - Lng32 versions) + Lng32 versions) { htc_ = client_->getHTableClient((NAHeap *)heap_, tblName.val, useTRex_, hbs_); if (htc_ == NULL) @@ -659,6 +660,7 @@ Lng32 ExpHbaseInterface_JNI::scanOpen( inColNamesToFilter, inCompareOpList, inColValuesToCompare, + dopParallelScanner, samplePercent, useSnapshotScan, snapTimeout, http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/exp/ExpHbaseInterface.h ---------------------------------------------------------------------- diff --git a/core/sql/exp/ExpHbaseInterface.h b/core/sql/exp/ExpHbaseInterface.h index f711bcf..83d6c55 100644 --- a/core/sql/exp/ExpHbaseInterface.h +++ b/core/sql/exp/ExpHbaseInterface.h @@ -158,6 +158,7 @@ class ExpHbaseInterface : public NABasicObject const LIST(NAString) *inColNamesToFilter, const LIST(NAString) *inCompareOpList, const LIST(NAString) *inColValuesToCompare, + Float32 dopParallelScanner = 0.0f, Float32 samplePercent = -1.0f, NABoolean useSnapshotScan = FALSE, Lng32 snapTimeout = 0, @@ -470,6 +471,7 @@ class ExpHbaseInterface_JNI : public ExpHbaseInterface const LIST(NAString) *inColNamesToFilter, const LIST(NAString) *inCompareOpList, const LIST(NAString) *inColValuesToCompare, + Float32 DOPparallelScanner = 0.0f, Float32 samplePercent = -1.0f, NABoolean useSnapshotScan = FALSE, Lng32 snapTimeout = 0, http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/generator/GenExplain.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenExplain.cpp b/core/sql/generator/GenExplain.cpp index e419dca..42a29df 100644 --- a/core/sql/generator/GenExplain.cpp +++ b/core/sql/generator/GenExplain.cpp @@ -956,6 +956,13 @@ HbaseAccess::addSpecificExplainInfo(ExplainTupleMaster *explainTuple, } char buf[20]; + + if ((((ComTdbHbaseAccess *)tdb)->getHbasePerfAttributes()->dopParallelScanner())>0.0) { + description += "parallel_scanner: " ; + sprintf(buf, "%g ", ((ComTdbHbaseAccess *)tdb)->getHbasePerfAttributes()->dopParallelScanner()); + description += buf; + } + if ( getProbes().getValue() > 0.0 ) { description += "probes: "; // total number of probes sprintf(buf, "%g ", getProbes().getValue()); http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/generator/GenRelScan.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/GenRelScan.cpp b/core/sql/generator/GenRelScan.cpp index b56a7ae..e1b9468 100644 --- a/core/sql/generator/GenRelScan.cpp +++ b/core/sql/generator/GenRelScan.cpp @@ -2815,6 +2815,7 @@ short HbaseAccess::codeGen(Generator * generator) hbaseBlockSize, hbpa); + generator->setHBaseParallelScanner(hbpa); ComTdbHbaseAccess::HbaseAccessOptions * hbo = NULL; http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/generator/Generator.cpp ---------------------------------------------------------------------- diff --git a/core/sql/generator/Generator.cpp b/core/sql/generator/Generator.cpp index 24b7eb9..f595192 100644 --- a/core/sql/generator/Generator.cpp +++ b/core/sql/generator/Generator.cpp @@ -3158,4 +3158,7 @@ void Generator::setHBaseSmallScanner(Int32 hbaseRowSize, double estRowsAccessed, hbpa->setMaxNumRowsPerHbaseBlock(hbaseBlockSize/hbaseRowSize); } +void Generator::setHBaseParallelScanner(ComTdbHbaseAccess::HbasePerfAttributes * hbpa){ + hbpa->setDopParallelScanner(CmpCommon::getDefaultNumeric(HBASE_DOP_PARALLEL_SCANNER)); +} http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/generator/Generator.h ---------------------------------------------------------------------- diff --git a/core/sql/generator/Generator.h b/core/sql/generator/Generator.h index a057d33..7cd02d6 100644 --- a/core/sql/generator/Generator.h +++ b/core/sql/generator/Generator.h @@ -1641,7 +1641,7 @@ public: ComTdbHbaseAccess::HbasePerfAttributes * hbpa); void setHBaseSmallScanner(Int32 hbaseRowSize, double rowsAccessed, Int32 hbaseBlockSize, ComTdbHbaseAccess::HbasePerfAttributes * hbpa); - + void setHBaseParallelScanner(ComTdbHbaseAccess::HbasePerfAttributes * hbpa); NASet<Int64> &objectUids() { return objectUids_; } http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/regress/executor/EXPECTED140 ---------------------------------------------------------------------- diff --git a/core/sql/regress/executor/EXPECTED140 b/core/sql/regress/executor/EXPECTED140 index c82aabd..00d157c 100644 --- a/core/sql/regress/executor/EXPECTED140 +++ b/core/sql/regress/executor/EXPECTED140 @@ -45,6 +45,15 @@ --- 1 row(s) updated. >> +>>upsert using load ++>into t140b ++> select x1,x1,10*x1,100*x1,1000*x1,10*x1+1,100*x1+1, 1000*x1+1, 'aaa'||cast(x1 as varchar(10)) ++> ++> from t140helper ++>transpose 0,1,2,3,4,5,6,7,8,9 as x1; + +--- 10 row(s) inserted. +>> >>obey TEST140(run); >>-- test returned rows with or without adding key column and test of all >>pushdown functions with null or non null column >>-- only one column retrieved @@ -53,7 +62,7 @@ ------------------------------------------------------------------ PLAN SUMMARY MODULE_NAME .............. DYNAMICALLY COMPILED STATEMENT_NAME ........... NOT NAMED -PLAN_ID .................. 212321974183907757 +PLAN_ID .................. 212326821484634327 ROWS_OUT ................ 33 EST_TOTAL_COST ........... 0.05 STATEMENT ................ select a from t140 where b>500; @@ -106,7 +115,7 @@ DESCRIPTION DDL_TRANSACTIONS ....... ON SCHEMA ................. TRAFODION.SCH GENERATE_EXPLAIN ....... ON - ObjectUIDs ............. 7577396214768470203 + ObjectUIDs ............. 2948826545036618858 select_list ............ TRAFODION.SCH.T140.A @@ -154,7 +163,7 @@ A ------------------------------------------------------------------ PLAN SUMMARY MODULE_NAME .............. DYNAMICALLY COMPILED STATEMENT_NAME ........... NOT NAMED -PLAN_ID .................. 212321974185251743 +PLAN_ID .................. 212326821484786794 ROWS_OUT ................ 33 EST_TOTAL_COST ........... 0.05 STATEMENT ................ select an from t140b where b<=200; @@ -207,7 +216,7 @@ DESCRIPTION DDL_TRANSACTIONS ....... ON SCHEMA ................. TRAFODION.SCH GENERATE_EXPLAIN ....... ON - ObjectUIDs ............. 7577396214768470287 + ObjectUIDs ............. 2948826545036618945 select_list ............ TRAFODION.SCH.T140B.AN @@ -242,7 +251,7 @@ DESCRIPTION ------------------------------------------------------------------ PLAN SUMMARY MODULE_NAME .............. DYNAMICALLY COMPILED STATEMENT_NAME ........... NOT NAMED -PLAN_ID .................. 212321974185404440 +PLAN_ID .................. 212326821484919166 ROWS_OUT ................ 33 EST_TOTAL_COST ........... 0.05 STATEMENT ................ select an from t140 where b<=200; @@ -295,7 +304,7 @@ DESCRIPTION DDL_TRANSACTIONS ....... ON SCHEMA ................. TRAFODION.SCH GENERATE_EXPLAIN ....... ON - ObjectUIDs ............. 7577396214768470203 + ObjectUIDs ............. 2948826545036618858 select_list ............ TRAFODION.SCH.T140.AN @@ -341,7 +350,7 @@ AN ------------------------------------------------------------------ PLAN SUMMARY MODULE_NAME .............. DYNAMICALLY COMPILED STATEMENT_NAME ........... NOT NAMED -PLAN_ID .................. 212321974185568983 +PLAN_ID .................. 212326821485075357 ROWS_OUT ................ 10 EST_TOTAL_COST ........... 0.05 STATEMENT ................ select an from t140 where b=200 and an is not null; @@ -394,7 +403,7 @@ DESCRIPTION DDL_TRANSACTIONS ....... ON SCHEMA ................. TRAFODION.SCH GENERATE_EXPLAIN ....... ON - ObjectUIDs ............. 7577396214768470203 + ObjectUIDs ............. 2948826545036618858 select_list ............ TRAFODION.SCH.T140.AN input_variables ........ %(200) @@ -438,7 +447,7 @@ AN ------------------------------------------------------------------ PLAN SUMMARY MODULE_NAME .............. DYNAMICALLY COMPILED STATEMENT_NAME ........... NOT NAMED -PLAN_ID .................. 212321974185775205 +PLAN_ID .................. 212326821485216670 ROWS_OUT ................ 67 EST_TOTAL_COST ........... 0.05 STATEMENT ................ select an, a from t140 where b!=500; @@ -491,7 +500,7 @@ DESCRIPTION DDL_TRANSACTIONS ....... ON SCHEMA ................. TRAFODION.SCH GENERATE_EXPLAIN ....... ON - ObjectUIDs ............. 7577396214768470203 + ObjectUIDs ............. 2948826545036618858 select_list ............ TRAFODION.SCH.T140.AN, TRAFODION.SCH.T140.A @@ -571,7 +580,7 @@ AN ------------------------------------------------------------------ PLAN SUMMARY MODULE_NAME .............. DYNAMICALLY COMPILED STATEMENT_NAME ........... NOT NAMED -PLAN_ID .................. 212321974185993581 +PLAN_ID .................. 212326821485412711 ROWS_OUT ................ 10 EST_TOTAL_COST ........... 0.05 STATEMENT ................ select an from t140 where bn=201 and an is not null; @@ -624,7 +633,7 @@ DESCRIPTION DDL_TRANSACTIONS ....... ON SCHEMA ................. TRAFODION.SCH GENERATE_EXPLAIN ....... ON - ObjectUIDs ............. 7577396214768470203 + ObjectUIDs ............. 2948826545036618858 select_list ............ TRAFODION.SCH.T140.AN input_variables ........ %(201) @@ -667,7 +676,7 @@ AN ------------------------------------------------------------------ PLAN SUMMARY MODULE_NAME .............. DYNAMICALLY COMPILED STATEMENT_NAME ........... NOT NAMED -PLAN_ID .................. 212321974186137176 +PLAN_ID .................. 212326821485540922 ROWS_OUT ................ 67 EST_TOTAL_COST ........... 0.05 STATEMENT ................ select an, a from t140 where bn!=501; @@ -720,7 +729,7 @@ DESCRIPTION DDL_TRANSACTIONS ....... ON SCHEMA ................. TRAFODION.SCH GENERATE_EXPLAIN ....... ON - ObjectUIDs ............. 7577396214768470203 + ObjectUIDs ............. 2948826545036618858 select_list ............ TRAFODION.SCH.T140.AN, TRAFODION.SCH.T140.A @@ -815,7 +824,7 @@ A ------------------------------------------------------------------ PLAN SUMMARY MODULE_NAME .............. DYNAMICALLY COMPILED STATEMENT_NAME ........... NOT NAMED -PLAN_ID .................. 212321974186325095 +PLAN_ID .................. 212326821485768519 ROWS_OUT ................ 11 EST_TOTAL_COST ........... 0.05 STATEMENT ................ select an from t140 where an between 20 and 40; @@ -868,7 +877,7 @@ DESCRIPTION DDL_TRANSACTIONS ....... ON SCHEMA ................. TRAFODION.SCH GENERATE_EXPLAIN ....... ON - ObjectUIDs ............. 7577396214768470203 + ObjectUIDs ............. 2948826545036618858 select_list ............ TRAFODION.SCH.T140.AN @@ -918,6 +927,400 @@ AN --- 4 row(s) selected. >> +>>-- test parallel scanner +>>-- turn off small scanner as it will force single scanner +>>cqd hbase_small_scanner 'OFF'; + +--- SQL operation complete. +>>cqd parallel_num_esps '1'; + +--- SQL operation complete. +>>-- force 2 threads +>>cqd hbase_dop_parallel_scanner '2.0'; + +--- SQL operation complete. +>>explain select avg(a) from t140b; + +------------------------------------------------------------------ PLAN SUMMARY +MODULE_NAME .............. DYNAMICALLY COMPILED +STATEMENT_NAME ........... NOT NAMED +PLAN_ID .................. 212326821485986085 +ROWS_OUT ................. 1 +EST_TOTAL_COST ........... 0.05 +STATEMENT ................ select avg(a) from t140b; + + +------------------------------------------------------------------ NODE LISTING +ROOT ====================================== SEQ_NO 3 ONLY CHILD 2 +REQUESTS_IN .............. 1 +ROWS_OUT ................. 1 +EST_OPER_COST ............ 0 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est ........... 1 + fragment_id ............ 0 + parent_frag ............ (none) + fragment_type .......... master + statement_index ........ 0 + affinity_value ......... 0 + max_max_cardinality 100 + total_overflow_size .... 0.00 KB + xn_access_mode ......... read_only + xn_autoabort_interval 0 + auto_query_retry ....... enabled + plan_version ....... 2,600 + embedded_arkcmp ........ used + IS_SQLCI ............... ON + LDAP_USERNAME + TARGET_CODE ............ DEBUG + TARGET_CPU_FREQUENCY 199 + TARGET_IO_SEEK_TIME .... 0.02 + TARGET_IO_SEQ_READ_RATE 2.5 + TARGET_MSG_LOCAL_RATE 10 + TARGET_MSG_LOCAL_TIME 0.01 + TARGET_MSG_REMOTE_RAT 100 + TARGET_MSG_REMOTE_TIME 0.01 + ARKCMP_FAKE_HW ......... ON + SKIP_METADATA_VIEWS .... ON + DEF_NUM_SMP_CPUS ....... 2 + MAX_ESPS_PER_CPU_PER_OP 1 + DEF_NUM_NODES_IN_ACTIVE 1 + POS_ALLOW_NON_PK_TABLES ON + MODE_SEABASE ........... ON + SEABASE_VOLATILE_TABLES ON + HBASE_ASYNC_DROP_TABLE OFF + HBASE_SERIALIZATION .... ON + HBASE_FILTER_PREDS ..... 2 + TRAF_ALIGNED_ROW_FORMAT OFF + TRAF_INDEX_CREATE_OPT ON + DDL_TRANSACTIONS ....... ON + SCHEMA ................. TRAFODION.SCH + HBASE_SMALL_SCANNER .... OFF + PARALLEL_NUM_ESPS ...... 1 + HBASE_DOP_PARALLEL_SCAN 2 + GENERATE_EXPLAIN ....... ON + ObjectUIDs ............. 2948826545036618945 + select_list ............ cast(cast((cast((cast((cast(sum(TRAFODION.SCH.T140B. + A)) * 10000 ...0)) / cast(count(1 )))) / 10000 + ...0))) + + +SORT_SCALAR_AGGR ========================== SEQ_NO 2 ONLY CHILD 1 +REQUESTS_IN .............. 1 +ROWS_OUT ................. 1 +EST_OPER_COST ............ 0.01 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est ........... 1 + fragment_id ............ 0 + parent_frag ............ (none) + fragment_type .......... master + aggregates ............. sum(TRAFODION.SCH.T140B.A), count(1 ) + + +TRAFODION_SCAN ============================ SEQ_NO 1 NO CHILDREN +TABLE_NAME ............... T140B +REQUESTS_IN .............. 1 +ROWS_OUT ............... 100 +EST_OPER_COST ............ 0.05 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est ......... 100 + fragment_id ............ 0 + parent_frag ............ (none) + fragment_type .......... master + scan_type .............. subset scan of table TRAFODION.SCH.T140B + object_type ............ Trafodion + columns ................ all + begin_keys(incl) + end_keys(incl) + cache_size ........... 100 + parallel_scanner ....... 2 + probes ................. 1 + rows_accessed ........ 100 + column_retrieved ....... #1:3 + key_columns ............ _SALT_, UNIQ, UNIQ2 + +--- SQL operation complete. +>>select avg(a) from t140b; + +(EXPR) +-------------------- + + 45 + +--- 1 row(s) selected. +>>-- force 100% of threads (with 2 partition this is 2 threads) +>>cqd hbase_dop_parallel_scanner '1.0'; + +--- SQL operation complete. +>>explain select avg(a) from t140b; + +------------------------------------------------------------------ PLAN SUMMARY +MODULE_NAME .............. DYNAMICALLY COMPILED +STATEMENT_NAME ........... NOT NAMED +PLAN_ID .................. 212326821486288071 +ROWS_OUT ................. 1 +EST_TOTAL_COST ........... 0.05 +STATEMENT ................ select avg(a) from t140b; + + +------------------------------------------------------------------ NODE LISTING +ROOT ====================================== SEQ_NO 3 ONLY CHILD 2 +REQUESTS_IN .............. 1 +ROWS_OUT ................. 1 +EST_OPER_COST ............ 0 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est ........... 1 + fragment_id ............ 0 + parent_frag ............ (none) + fragment_type .......... master + statement_index ........ 0 + affinity_value ......... 0 + max_max_cardinality 100 + total_overflow_size .... 0.00 KB + xn_access_mode ......... read_only + xn_autoabort_interval 0 + auto_query_retry ....... enabled + plan_version ....... 2,600 + embedded_arkcmp ........ used + IS_SQLCI ............... ON + LDAP_USERNAME + TARGET_CODE ............ DEBUG + TARGET_CPU_FREQUENCY 199 + TARGET_IO_SEEK_TIME .... 0.02 + TARGET_IO_SEQ_READ_RATE 2.5 + TARGET_MSG_LOCAL_RATE 10 + TARGET_MSG_LOCAL_TIME 0.01 + TARGET_MSG_REMOTE_RAT 100 + TARGET_MSG_REMOTE_TIME 0.01 + ARKCMP_FAKE_HW ......... ON + SKIP_METADATA_VIEWS .... ON + DEF_NUM_SMP_CPUS ....... 2 + MAX_ESPS_PER_CPU_PER_OP 1 + DEF_NUM_NODES_IN_ACTIVE 1 + POS_ALLOW_NON_PK_TABLES ON + MODE_SEABASE ........... ON + SEABASE_VOLATILE_TABLES ON + HBASE_ASYNC_DROP_TABLE OFF + HBASE_SERIALIZATION .... ON + HBASE_FILTER_PREDS ..... 2 + TRAF_ALIGNED_ROW_FORMAT OFF + TRAF_INDEX_CREATE_OPT ON + DDL_TRANSACTIONS ....... ON + SCHEMA ................. TRAFODION.SCH + HBASE_SMALL_SCANNER .... OFF + PARALLEL_NUM_ESPS ...... 1 + HBASE_DOP_PARALLEL_SCAN 1 + GENERATE_EXPLAIN ....... ON + ObjectUIDs ............. 2948826545036618945 + select_list ............ cast(cast((cast((cast((cast(sum(TRAFODION.SCH.T140B. + A)) * 10000 ...0)) / cast(count(1 )))) / 10000 + ...0))) + + +SORT_SCALAR_AGGR ========================== SEQ_NO 2 ONLY CHILD 1 +REQUESTS_IN .............. 1 +ROWS_OUT ................. 1 +EST_OPER_COST ............ 0.01 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est ........... 1 + fragment_id ............ 0 + parent_frag ............ (none) + fragment_type .......... master + aggregates ............. sum(TRAFODION.SCH.T140B.A), count(1 ) + + +TRAFODION_SCAN ============================ SEQ_NO 1 NO CHILDREN +TABLE_NAME ............... T140B +REQUESTS_IN .............. 1 +ROWS_OUT ............... 100 +EST_OPER_COST ............ 0.05 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est ......... 100 + fragment_id ............ 0 + parent_frag ............ (none) + fragment_type .......... master + scan_type .............. subset scan of table TRAFODION.SCH.T140B + object_type ............ Trafodion + columns ................ all + begin_keys(incl) + end_keys(incl) + cache_size ........... 100 + parallel_scanner ....... 1 + probes ................. 1 + rows_accessed ........ 100 + column_retrieved ....... #1:3 + key_columns ............ _SALT_, UNIQ, UNIQ2 + +--- SQL operation complete. +>>select avg(a) from t140b; + +(EXPR) +-------------------- + + 45 + +--- 1 row(s) selected. +>>-- reset to regular scanner +>>cqd hbase_dop_parallel_scanner reset; + +--- SQL operation complete. +>>cqd hbase_small_scanner reset; + +--- SQL operation complete. +>>cqd parallel_num_esps reset; + +--- SQL operation complete. +>>explain select avg(a) from t140b; + +------------------------------------------------------------------ PLAN SUMMARY +MODULE_NAME .............. DYNAMICALLY COMPILED +STATEMENT_NAME ........... NOT NAMED +PLAN_ID .................. 212326821486475177 +ROWS_OUT ................. 1 +EST_TOTAL_COST ........... 0.05 +STATEMENT ................ select avg(a) from t140b; + + +------------------------------------------------------------------ NODE LISTING +ROOT ====================================== SEQ_NO 5 ONLY CHILD 4 +REQUESTS_IN .............. 1 +ROWS_OUT ................. 1 +EST_OPER_COST ............ 0 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est ........... 1 + fragment_id ............ 0 + parent_frag ............ (none) + fragment_type .......... master + statement_index ........ 0 + affinity_value ......... 0 + est_memory_per_cpu ..... 1 KB + max_max_cardinality 100 + total_overflow_size .... 0.00 KB + esp_2_node_map ......... (\NSK:-1:-1) + xn_access_mode ......... read_only + xn_autoabort_interval 0 + auto_query_retry ....... enabled + plan_version ....... 2,600 + embedded_arkcmp ........ used + IS_SQLCI ............... ON + LDAP_USERNAME + TARGET_CODE ............ DEBUG + TARGET_CPU_FREQUENCY 199 + TARGET_IO_SEEK_TIME .... 0.02 + TARGET_IO_SEQ_READ_RATE 2.5 + TARGET_MSG_LOCAL_RATE 10 + TARGET_MSG_LOCAL_TIME 0.01 + TARGET_MSG_REMOTE_RAT 100 + TARGET_MSG_REMOTE_TIME 0.01 + ARKCMP_FAKE_HW ......... ON + SKIP_METADATA_VIEWS .... ON + DEF_NUM_SMP_CPUS ....... 2 + MAX_ESPS_PER_CPU_PER_OP 1 + DEF_NUM_NODES_IN_ACTIVE 1 + POS_ALLOW_NON_PK_TABLES ON + MODE_SEABASE ........... ON + SEABASE_VOLATILE_TABLES ON + HBASE_ASYNC_DROP_TABLE OFF + HBASE_SERIALIZATION .... ON + HBASE_FILTER_PREDS ..... 2 + TRAF_ALIGNED_ROW_FORMAT OFF + TRAF_INDEX_CREATE_OPT ON + DDL_TRANSACTIONS ....... ON + SCHEMA ................. TRAFODION.SCH + GENERATE_EXPLAIN ....... ON + ObjectUIDs ............. 2948826545036618945 + select_list ............ cast(cast(cast((cast((cast((cast(sum(sum(TRAFODION.S + CH.T140B.A))) * 10000 ...0)) / cast(sum(count(1 + ))))) / 10000 ...0)))) + + +SORT_PARTIAL_AGGR_ROOT ==================== SEQ_NO 4 ONLY CHILD 3 +REQUESTS_IN .............. 1 +ROWS_OUT ................. 1 +EST_OPER_COST ............ 0.01 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est ........... 1 + fragment_id ............ 0 + parent_frag ............ (none) + fragment_type .......... master + aggregates ............. sum(sum(TRAFODION.SCH.T140B.A)), sum(count(1 )) + + +ESP_EXCHANGE ============================== SEQ_NO 3 ONLY CHILD 2 +REQUESTS_IN .............. 1 +ROWS_OUT ................ 50 +EST_OPER_COST ............ 0.01 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est .......... 50 + fragment_id ............ 2 + parent_frag ............ 0 + fragment_type .......... esp + est_memory_per_cpu ..... 1 KB + buffer_size ........ 6,250 + record_length ......... 24 + parent_processes ....... 1 + child_processes ........ 2 + child_partitioning_func hash2 partitioned 2 ways on + (TRAFODION.SCH.T140B.UNIQ, + TRAFODION.SCH.T140B.UNIQ2) + seamonster_query ....... no + seamonster_exchange .... no + + +SORT_PARTIAL_AGGR_LEAF ==================== SEQ_NO 2 ONLY CHILD 1 +REQUESTS_IN .............. 1 +ROWS_OUT ................ 50 +EST_OPER_COST ............ 0.01 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est .......... 50 + fragment_id ............ 2 + parent_frag ............ 0 + fragment_type .......... esp + aggregates ............. sum(TRAFODION.SCH.T140B.A), count(1 ) + + +TRAFODION_SCAN ============================ SEQ_NO 1 NO CHILDREN +TABLE_NAME ............... T140B +REQUESTS_IN .............. 1 +ROWS_OUT ............... 100 +EST_OPER_COST ............ 0.05 +EST_TOTAL_COST ........... 0.05 +DESCRIPTION + max_card_est ......... 100 + fragment_id ............ 2 + parent_frag ............ 0 + fragment_type .......... esp + scan_type .............. subset scan of table TRAFODION.SCH.T140B + object_type ............ Trafodion + cache_size ........... 100 + probes ................. 1 + rows_accessed ........ 100 + column_retrieved ....... #1:3 + key_columns ............ _SALT_, UNIQ, UNIQ2 + begin_key .............. (_SALT_ = (\:_sys_HostVarLoHashPart Hash2Distrib + 2)), (UNIQ = <min>), (UNIQ2 = <min>) + end_key ................ (_SALT_ = (\:_sys_HostVarHiHashPart Hash2Distrib + 2)), (UNIQ = <max>), (UNIQ2 = <max>) + +--- SQL operation complete. +>>select avg(a) from t140b; + +(EXPR) +-------------------- + + 45 + +--- 1 row(s) selected. +>> >>obey TEST140(clnup); >>drop table t140helper; http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/regress/executor/TEST140 ---------------------------------------------------------------------- diff --git a/core/sql/regress/executor/TEST140 b/core/sql/regress/executor/TEST140 index 1540522..424b12a 100644 --- a/core/sql/regress/executor/TEST140 +++ b/core/sql/regress/executor/TEST140 @@ -21,7 +21,7 @@ -- -- @@@ END COPYRIGHT @@@ -- --- Functionality: Advanced predicate pushdown (V2) +-- Functionality: Advanced predicate pushdown (V2) and parallelScanner -- Expected files: EXPECTED140 -- Table created: t140 t140b t140helper -- Limitations: @@ -63,6 +63,13 @@ insert into t140 values(14,14,42,402,4002,43,403,4003,NULL); insert into t140 values(15,15,52,502,5002,53,503,5003,'asd'); update t140 set d = null where uniq = 15; +upsert using load +into t140b + select x1,x1,10*x1,100*x1,1000*x1,10*x1+1,100*x1+1, 1000*x1+1, 'aaa'||cast(x1 as varchar(10)) + + from t140helper +transpose 0,1,2,3,4,5,6,7,8,9 as x1; + ?section run -- test returned rows with or without adding key column and test of all pushdown functions with null or non null column -- only one column retrieved @@ -94,3 +101,22 @@ explain select an from t140 where an between 20 and 40; select an from t140 where an between 20 and 40; select an from t140 where an in (21,41,51,61,10); +-- test parallel scanner +-- turn off small scanner as it will force single scanner +cqd hbase_small_scanner 'OFF'; +cqd parallel_num_esps '1'; +-- force 2 threads +cqd hbase_dop_parallel_scanner '2.0'; +explain select avg(a) from t140b; +select avg(a) from t140b; +-- force 100% of threads (with 2 partition this is 2 threads) +cqd hbase_dop_parallel_scanner '1.0'; +explain select avg(a) from t140b; +select avg(a) from t140b; +-- reset to regular scanner +cqd hbase_dop_parallel_scanner reset; +cqd hbase_small_scanner reset; +cqd parallel_num_esps reset; +explain select avg(a) from t140b; +select avg(a) from t140b; + http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/sqlcomp/DefaultConstants.h ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h index 5c0d13b..d284fc1 100644 --- a/core/sql/sqlcomp/DefaultConstants.h +++ b/core/sql/sqlcomp/DefaultConstants.h @@ -3808,6 +3808,10 @@ enum DefaultConstants // Currently syskey, _salt_, _division_. TRAF_ALLOW_RESERVED_COLNAMES, + //if 0, regular scanner is used. From 0.x to 1.0, percentage of regions that need to be scanned that will be done in parallel. + //if >= 2, set a fixed number of thread, real DOP. 2.0 2 thread, 3.0 3 thread etc. + HBASE_DOP_PARALLEL_SCANNER, + // This enum constant must be the LAST one in the list; it's a count, // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)! __NUM_DEFAULT_ATTRIBUTES http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/sqlcomp/nadefaults.cpp ---------------------------------------------------------------------- diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp index 9d53aea..2b2c3ed 100644 --- a/core/sql/sqlcomp/nadefaults.cpp +++ b/core/sql/sqlcomp/nadefaults.cpp @@ -1757,7 +1757,7 @@ SDDkwd__(EXE_DIAGNOSTIC_EVENTS, "OFF"), // We can remove this once the delete costing code has broader // exposure. DDkwd__(HBASE_DELETE_COSTING, "ON"), - + DDflt0_(HBASE_DOP_PARALLEL_SCANNER, "0."), DDkwd__(HBASE_FILTER_PREDS, "OFF"), DDkwd__(HBASE_HASH2_PARTITIONING, "ON"), DDui___(HBASE_INDEX_LEVEL, "0"), http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/src/main/java/org/trafodion/sql/HTableClient.java ---------------------------------------------------------------------- diff --git a/core/sql/src/main/java/org/trafodion/sql/HTableClient.java b/core/sql/src/main/java/org/trafodion/sql/HTableClient.java index 0760b90..4a9bc07 100644 --- a/core/sql/src/main/java/org/trafodion/sql/HTableClient.java +++ b/core/sql/src/main/java/org/trafodion/sql/HTableClient.java @@ -778,6 +778,7 @@ public class HTableClient { Object[] colNamesToFilter, Object[] compareOpList, Object[] colValuesToCompare, + float dopParallelScanner, float samplePercent, boolean inPreFetch, boolean useSnapshotScan, @@ -979,9 +980,9 @@ public class HTableClient { if (useTRexScanner && (transID != 0)) { scanner = table.getScanner(transID, scan); } else { - scanner = table.getScanner(scan); - } - if (logger.isTraceEnabled()) logger.trace("startScan(). After getScanner. Scanner: " + scanner); + scanner = table.getScanner(scan,dopParallelScanner); + } + if (logger.isTraceEnabled()) logger.trace("startScan(). After getScanner. Scanner: " + scanner+" dop:"+dopParallelScanner); } else {
