Repository: incubator-trafodion
Updated Branches:
  refs/heads/master cae7aba40 -> 536c51d54


[TRAFODION-1421] Implement parallel scanner primitive
new CQD: hbase_dop_parallel_scanner
- default 0.0 = disabled
- from 0.x to 1.0 -> the DOP will be dynamically assessed as a percentage of 
region to scan. If 10 region, and cqd is .3 (30%), then 3 threads will be 
used...
- from 2.0 and above, will take ceiling, and use it as fixed number of threads.
Advantage of parallel scanner parallelism over ESP is on resource consumption. 
Parallel scanner is just creating threads, while ESP are full proceses wth high 
memory consumption.
For now, this feature is just a primitive that can be manually exercised. The 
next obvious step is to make the compiler aware of it and pick it over ESP DOP 
when appropriate...


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/a7f9a628
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/a7f9a628
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/a7f9a628

Branch: refs/heads/master
Commit: a7f9a628179fcd4077a566f558e653b1380eabc6
Parents: b7ecf18
Author: Eric Owhadi <[email protected]>
Authored: Mon Apr 11 16:59:23 2016 +0000
Committer: Eric Owhadi <[email protected]>
Committed: Mon Apr 11 16:59:23 2016 +0000

----------------------------------------------------------------------
 .../hbase/client/ClientScanner98.java.tmpl      | 490 +++++++++++++++++++
 .../client/TrafParallelClientScanner.java.tmpl  | 325 ++++++++++++
 .../hbase/client/transactional/RMInterface.java |   4 +-
 .../transactional/SsccTransactionalTable.java   |   8 +-
 .../transactional/TransactionalTable.java       |   8 +-
 .../transactional/TransactionalTableClient.java |   3 +-
 core/sql/comexe/ComTdbHbaseAccess.h             |   4 +
 core/sql/executor/ExHbaseIUD.cpp                |   6 +-
 core/sql/executor/ExHbaseSelect.cpp             |   3 +
 core/sql/executor/HBaseClient_JNI.cpp           |   5 +-
 core/sql/executor/HBaseClient_JNI.h             |   1 +
 core/sql/exp/ExpHbaseInterface.cpp              |   4 +-
 core/sql/exp/ExpHbaseInterface.h                |   2 +
 core/sql/generator/GenExplain.cpp               |   7 +
 core/sql/generator/GenRelScan.cpp               |   1 +
 core/sql/generator/Generator.cpp                |   3 +
 core/sql/generator/Generator.h                  |   2 +-
 core/sql/regress/executor/EXPECTED140           | 435 +++++++++++++++-
 core/sql/regress/executor/TEST140               |  28 +-
 core/sql/sqlcomp/DefaultConstants.h             |   4 +
 core/sql/sqlcomp/nadefaults.cpp                 |   2 +-
 .../java/org/trafodion/sql/HTableClient.java    |   7 +-
 22 files changed, 1318 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl
new file mode 100644
index 0000000..81cec3f
--- /dev/null
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl
@@ -0,0 +1,490 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
+import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Implements the scanner interface for the HBase client.
+ * If there are multiple regions in a table, this scanner will iterate
+ * through them all.
+ */
[email protected]
[email protected]
+public class ClientScanner98 extends AbstractClientScanner {
+    private final Log LOG = LogFactory.getLog(this.getClass());
+    protected Scan scan;
+    protected boolean closed = false;
+    // Current region scanner is against.  Gets cleared if current region goes
+    // wonky: e.g. if it splits on us.
+    protected HRegionInfo currentRegion = null;
+    protected ScannerCallable callable = null;
+    protected final LinkedList<Result> cache = new LinkedList<Result>();
+    protected final int caching;
+    protected long lastNext;
+    // Keep lastResult returned successfully in case we have to reset scanner.
+    protected Result lastResult = null;
+    protected final long maxScannerResultSize;
+    private final HConnection connection;
+    private final TableName tableName;
+    protected final int scannerTimeout;
+    protected boolean scanMetricsPublished = false;
+    protected RpcRetryingCaller<Result []> caller;
+    protected RpcControllerFactory rpcControllerFactory;
+
+    /**
+     * Create a new ClientScanner for the specified table. An HConnection will 
be
+     * retrieved using the passed Configuration.
+     * Note that the passed {@link Scan}'s start row maybe changed changed.
+     *
+     * @param conf The {@link Configuration} to use.
+     * @param scan {@link Scan} to use in this scanner
+     * @param tableName The table that we wish to scan
+     * @throws IOException
+     */
+    @Deprecated
+    public ClientScanner98(final Configuration conf, final Scan scan,
+        final TableName tableName) throws IOException {
+      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
+    }
+
+    /**
+     * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
+     */
+    @Deprecated
+    public ClientScanner98(final Configuration conf, final Scan scan,
+        final byte [] tableName) throws IOException {
+      this(conf, scan, TableName.valueOf(tableName));
+    }
+
+
+    /**
+     * Create a new ClientScanner for the specified table
+     * Note that the passed {@link Scan}'s start row maybe changed changed.
+     *
+     * @param conf The {@link Configuration} to use.
+     * @param scan {@link Scan} to use in this scanner
+     * @param tableName The table that we wish to scan
+     * @param connection Connection identifying the cluster
+     * @throws IOException
+     */
+  public ClientScanner98(final Configuration conf, final Scan scan, final 
TableName tableName,
+      HConnection connection) throws IOException {
+    this(conf, scan, tableName, connection, 
RpcRetryingCallerFactory.instantiate(conf),
+        RpcControllerFactory.instantiate(conf));
+  }
+
+  /**
+   * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, 
HConnection)}
+   */
+  @Deprecated
+  public ClientScanner98(final Configuration conf, final Scan scan, final byte 
[] tableName,
+      HConnection connection) throws IOException {
+    this(conf, scan, TableName.valueOf(tableName), connection, new 
RpcRetryingCallerFactory(conf),
+        RpcControllerFactory.instantiate(conf));
+  }
+
+  /**
+   * @deprecated Use
+   *             {@link #ClientScanner(Configuration, Scan, TableName, 
HConnection,
+   *             RpcRetryingCallerFactory, RpcControllerFactory)}
+   *             instead
+   */
+  @Deprecated
+  public ClientScanner98(final Configuration conf, final Scan scan, final 
TableName tableName,
+      HConnection connection, RpcRetryingCallerFactory rpcFactory) throws 
IOException {
+    this(conf, scan, tableName, connection, rpcFactory, 
RpcControllerFactory.instantiate(conf));
+  }
+
+  /**
+   * Create a new ClientScanner for the specified table Note that the passed 
{@link Scan}'s start
+   * row maybe changed changed.
+   * @param conf The {@link Configuration} to use.
+   * @param scan {@link Scan} to use in this scanner
+   * @param tableName The table that we wish to scan
+   * @param connection Connection identifying the cluster
+   * @throws IOException
+   */
+  public ClientScanner98(final Configuration conf, final Scan scan, final 
TableName tableName,
+      HConnection connection, RpcRetryingCallerFactory rpcFactory,
+      RpcControllerFactory controllerFactory) throws IOException {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Scan table=" + tableName
+            + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
+      } 
+      this.scan = scan;
+      this.tableName = tableName;
+      this.lastNext = System.currentTimeMillis();
+      this.connection = connection;
+      if (scan.getMaxResultSize() > 0) {
+        this.maxScannerResultSize = scan.getMaxResultSize();
+      } else {
+        this.maxScannerResultSize = conf.getLong(
+          HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+      }
+      this.scannerTimeout = HBaseConfiguration.getInt(conf,
+        HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+        HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
+        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+
+      // check if application wants to collect scan metrics
+      initScanMetrics(scan);
+
+      // Use the caching from the Scan.  If not set, use the default cache 
setting for this table.
+      if (this.scan.getCaching() > 0) {
+        this.caching = this.scan.getCaching();
+      } else {
+        this.caching = conf.getInt(
+            HConstants.HBASE_CLIENT_SCANNER_CACHING,
+            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+      }
+
+      this.caller = rpcFactory.<Result[]> newCaller();
+      this.rpcControllerFactory = controllerFactory;
+
+      initializeScannerInConstruction();
+    }
+
+    protected void initializeScannerInConstruction() throws IOException{
+      // initialize the scanner
+      nextScanner(this.caching, false);
+    }
+
+    protected HConnection getConnection() {
+      return this.connection;
+    }
+
+    /**
+     * @return Table name
+     * @deprecated Since 0.96.0; use {@link #getTable()}
+     */
+    @Deprecated
+    protected byte [] getTableName() {
+      return this.tableName.getName();
+    }
+
+    protected TableName getTable() {
+      return this.tableName;
+    }
+
+    protected Scan getScan() {
+      return scan;
+    }
+
+    protected long getTimestamp() {
+      return lastNext;
+    }
+
+    // returns true if the passed region endKey
+    protected boolean checkScanStopRow(final byte [] endKey) {
+      if (this.scan.getStopRow().length > 0) {
+        // there is a stop row, check to see if we are past it.
+        byte [] stopRow = scan.getStopRow();
+        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
+          endKey, 0, endKey.length);
+        if (cmp <= 0) {
+          // stopRow <= endKey (endKey is equals to or larger than stopRow)
+          // This is a stop.
+          return true;
+        }
+      }
+      return false; //unlikely.
+    }
+
+    /*
+     * Gets a scanner for the next region.  If this.currentRegion != null, then
+     * we will move to the endrow of this.currentRegion.  Else we will get
+     * scanner at the scan.getStartRow().  We will go no further, just tidy
+     * up outstanding scanners, if <code>currentRegion != null</code> and
+     * <code>done</code> is true.
+     * @param nbRows
+     * @param done Server-side says we're done scanning.
+     */
+  protected boolean nextScanner(int nbRows, final boolean done)
+    throws IOException {
+      // Close the previous scanner if it's open
+      if (this.callable != null) {
+        this.callable.setClose();
+        this.caller.callWithRetries(callable,scannerTimeout);
+        this.callable = null;
+      }
+
+      // Where to start the next scanner
+      byte [] localStartKey;
+
+      // if we're at end of table, close and return false to stop iterating
+      if (this.currentRegion != null) {
+        byte [] endKey = this.currentRegion.getEndKey();
+        if (endKey == null ||
+            Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
+            checkScanStopRow(endKey) ||
+            done) {
+          close();
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Finished " + this.currentRegion);
+          }
+          return false;
+        }
+        localStartKey = endKey;
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Finished " + this.currentRegion);
+        }
+      } else {
+        localStartKey = this.scan.getStartRow();
+      }
+
+      if (LOG.isDebugEnabled() && this.currentRegion != null) {
+        // Only worth logging if NOT first region in scan.
+        LOG.debug("Advancing internal scanner to startKey at '" +
+          Bytes.toStringBinary(localStartKey) + "'");
+      }
+      try {
+        callable = getScannerCallable(localStartKey, nbRows);
+        // Open a scanner on the region server starting at the
+        // beginning of the region
+        this.caller.callWithRetries(callable,scannerTimeout);
+        this.currentRegion = callable.getHRegionInfo();
+        if (this.scanMetrics != null) {
+          this.scanMetrics.countOfRegions.incrementAndGet();
+        }
+      } catch (IOException e) {
+        close();
+        throw e;
+      }
+      return true;
+    }
+
+    @InterfaceAudience.Private
+    protected ScannerCallable getScannerCallable(byte [] localStartKey,
+        int nbRows) {
+      scan.setStartRow(localStartKey);
+      ScannerCallable s =
+              new ScannerCallable((ClusterConnection)getConnection(), 
getTable(), scan, this.scanMetrics,
+                  this.rpcControllerFactory);
+      s.setCaching(nbRows);
+      return s;
+    }
+
+    /**
+     * Publish the scan metrics. For now, we use scan.setAttribute to pass the 
metrics back to the
+     * application or TableInputFormat.Later, we could push it to other 
systems. We don't use metrics
+     * framework because it doesn't support multi-instances of the same 
metrics on the same machine;
+     * for scan/map reduce scenarios, we will have multiple scans running at 
the same time.
+     *
+     * By default, scan metrics are disabled; if the application wants to 
collect them, this behavior
+     * can be turned on by calling calling:
+     *
+     * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, 
Bytes.toBytes(Boolean.TRUE))
+     */
+    protected void writeScanMetrics() {
+      if (this.scanMetrics == null || scanMetricsPublished) {
+        return;
+      }
+      MapReduceProtos.ScanMetrics pScanMetrics = 
ProtobufUtil.toScanMetrics(scanMetrics);
+      scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, 
pScanMetrics.toByteArray());
+      scanMetricsPublished = true;
+    }
+
+    @Override
+    public Result next() throws IOException {
+      // If the scanner is closed and there's nothing left in the cache, next 
is a no-op.
+      if (cache.size() == 0 && this.closed) {
+        return null;
+      }
+      if (cache.size() == 0) {
+        Result [] values = null;
+        long remainingResultSize = maxScannerResultSize;
+        int countdown = this.caching;
+        // We need to reset it if it's a new callable that was created
+        // with a countdown in nextScanner
+        callable.setCaching(this.caching);
+        // This flag is set when we want to skip the result returned.  We do
+        // this when we reset scanner because it split under us.
+        boolean skipFirst = false;
+        boolean retryAfterOutOfOrderException  = true;
+        do {
+          try {
+            if (skipFirst) {
+              // Skip only the first row (which was the last row of the last
+              // already-processed batch).
+              callable.setCaching(1);
+              values = this.caller.callWithRetries(callable,scannerTimeout);
+              callable.setCaching(this.caching);
+              skipFirst = false;
+            }
+            // Server returns a null values if scanning is to stop.  Else,
+            // returns an empty array if scanning is to go on and we've just
+            // exhausted current region.
+            values = this.caller.callWithRetries(callable,scannerTimeout);
+            if (skipFirst && values != null && values.length == 1) {
+              skipFirst = false; // Already skipped, unset it before scanning 
again
+              values = this.caller.callWithRetries(callable,scannerTimeout);
+            }
+            retryAfterOutOfOrderException  = true;
+          } catch (DoNotRetryIOException e) {
+            // DNRIOEs are thrown to make us break out of retries.  Some types 
of DNRIOEs want us
+            // to reset the scanner and come back in again.
+            if (e instanceof UnknownScannerException) {
+              long timeout = lastNext + scannerTimeout;
+              // If we are over the timeout, throw this exception to the 
client wrapped in
+              // a ScannerTimeoutException. Else, it's because the region 
moved and we used the old
+              // id against the new region server; reset the scanner.
+              //if (timeout < System.currentTimeMillis()) {
+                //long elapsed = System.currentTimeMillis() - lastNext;
+                //ScannerTimeoutException ex = new ScannerTimeoutException(
+                //    elapsed + "ms passed since the last invocation, " +
+                //        "timeout is currently set to " + scannerTimeout);
+                //ex.initCause(e);
+                //throw ex;
+              //}
+            } else {
+              // If exception is any but the list below throw it back to the 
client; else setup
+              // the scanner and retry.
+              Throwable cause = e.getCause();
+              if ((cause != null && cause instanceof 
NotServingRegionException) ||
+                (cause != null && cause instanceof 
RegionServerStoppedException) ||
+                e instanceof OutOfOrderScannerNextException) {
+                // Pass
+                // It is easier writing the if loop test as list of what is 
allowed rather than
+                // as a list of what is not allowed... so if in here, it means 
we do not throw.
+              } else {
+                throw e;
+              }
+            }
+            // Else, its signal from depths of ScannerCallable that we need to 
reset the scanner.
+            if (this.lastResult != null) {
+              // The region has moved. We need to open a brand new scanner at
+              // the new location.
+              // Reset the startRow to the row we've seen last so that the new
+              // scanner starts at the correct row. Otherwise we may see 
previously
+              // returned rows again.
+              // (ScannerCallable by now has "relocated" the correct region)
+              this.scan.setStartRow(this.lastResult.getRow());
+
+              // Skip first row returned.  We already let it out on previous
+              // invocation.
+              skipFirst = true;
+            }
+            if (e instanceof OutOfOrderScannerNextException) {
+              if (retryAfterOutOfOrderException) {
+                retryAfterOutOfOrderException = false;
+              } else {
+                // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE?
+                throw new DoNotRetryIOException("Failed after retry of " +
+                  "OutOfOrderScannerNextException: was there a rpc timeout?", 
e);
+              }
+            }
+            // Clear region.
+            this.currentRegion = null;
+            // Set this to zero so we don't try and do an rpc and close on 
remote server when
+            // the exception we got was UnknownScanner or the Server is going 
down.
+            callable = null;
+            // This continue will take us to while at end of loop where we 
will set up new scanner.
+            continue;
+          }
+          long currentTime = System.currentTimeMillis();
+          if (this.scanMetrics != null ) {
+            
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
+          }
+          lastNext = currentTime;
+          if (values != null && values.length > 0) {
+            for (Result rs : values) {
+              cache.add(rs);
+              for (Cell kv : rs.rawCells()) {
+                // TODO make method in Cell or CellUtil
+                remainingResultSize -= 
KeyValueUtil.ensureKeyValue(kv).heapSize();
+              }
+              countdown--;
+              this.lastResult = rs;
+            }
+          }
+          // Values == null means server-side filter has determined we must 
STOP
+        } while (remainingResultSize > 0 && countdown > 0 && 
nextScanner(countdown, values == null));
+      }
+
+      if (cache.size() > 0) {
+        return cache.poll();
+      }
+
+      // if we exhausted this scanner before calling close, write out the scan 
metrics
+      writeScanMetrics();
+      return null;
+    }
+
+    @Override
+    public void close() {
+      if (!scanMetricsPublished) writeScanMetrics();
+      if (callable != null) {
+        callable.setClose();
+        try {
+          this.caller.callWithRetries(callable,scannerTimeout);
+        } catch (UnknownScannerException e) {
+           // We used to catch this error, interpret, and rethrow. However, we
+           // have since decided that it's not nice for a scanner's close to
+           // throw exceptions. Chances are it was just due to lease time out.
+        } catch (IOException e) {
+           /* An exception other than UnknownScanner is unexpected. */
+           LOG.warn("scanner failed to close. Exception follows: " + e);
+        }
+        callable = null;
+      }
+      closed = true;
+    }
+#ifndef CDH1.0
+    @Override
+    public boolean renewLease() {
+      if (callable != null) {
+        // do not return any rows, do not advance the scanner
+        callable.setCaching(0);
+        try {
+          this.caller.callWithoutRetries(callable, this.scannerTimeout);
+        } catch (Exception e) {
+          return false;
+        } finally {
+          callable.setCaching(this.caching);
+        }
+        return true;
+      }
+      return false;
+    }
+#endif
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/TrafParallelClientScanner.java.tmpl
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/TrafParallelClientScanner.java.tmpl
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/TrafParallelClientScanner.java.tmpl
new file mode 100644
index 0000000..50842fc
--- /dev/null
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/TrafParallelClientScanner.java.tmpl
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+//import org.apache.hadoop.hbase.client.ClusterConnection;//for HBase 1.x
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+
+/**
+ * Implements the scanner interface for the HBase client.
+ * If there are multiple regions in a table, this scanner will iterate
+ * through them all in parallel.
+ */
+public class TrafParallelClientScanner extends AbstractClientScanner 
implements Closeable {
+  private final Log LOG = LogFactory.getLog(this.getClass());
+
+  static private Configuration       config = HBaseConfiguration.create();
+  
+  // special marker to indicate when a scanning task has finished
+  protected static final DoneMarker MARKER = new DoneMarker();
+
+  // the single reader object to use
+  protected ResultReader reader;
+
+  // the thread pool to be used
+  protected ExecutorService pool;
+  
+  // should store hbase.client.replicaCallTimeout.scan (HBase 1.x)
+  //static private int replicaCallTimeoutMicroSecondScan;
+  
+ // static {
+ //   replicaCallTimeoutMicroSecondScan = 
config.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // duplicated 
from TableConfiguration since it cannot be instanciated by lack of public 
constructor
+ // }
+  
+  /**
+   * If a caller has predetermined split points this constructor can be used.
+   * One thread per split will be used to execute the scan.
+   * @param connection The cluster connection to use
+   * @param scans The {@link Scan} objects defining the split points
+   * @param tableName The table to use
+   * @param bufferSize How many results to buffer
+   * (should be larger than the individual scans' caching setting)
+   * @throws IOException
+   */
+  public TrafParallelClientScanner(final HConnection connection, final 
Collection<Scan> scans,
+      final TableName tableName, final int bufferSize) throws IOException {
+    createDefaultPool(scans.size());
+    SingleReaderMultiWriterQueue rw = new 
SingleReaderMultiWriterQueue(bufferSize, scans.size());
+    this.reader = rw;
+    for (Scan s : scans) {
+      pool.submit(new ScanTask(s, connection, tableName, rw,0));
+    }
+  }
+
+  /**
+   * Execute a single scan across multiple threads. Results are returned when 
they are available,
+   * NOT in rowkey order.
+   * @param connection The cluster connection to use
+   * @param scan The scan to execute
+   * @param tableName The table to use
+   * @param parallelScaling The scaling factor to use. if between 0 and 1, 
percent of regions run in parallel, if > 1 fixed number of thread run in 
parallel
+   * @throws IOException
+   */
+  public TrafParallelClientScanner(final HConnection connection, final Scan 
scan,
+      final TableName tableName, final float parallelScaling) throws 
IOException {
+    if (parallelScaling <= 0)
+      throw new IllegalArgumentException("scalingFactor must > 0");
+
+    // get the region boundaries. null ES is fine, we're just using the table 
to lookup the region cache
+    HTable t = new HTable(tableName, connection, null);
+    List<HRegionLocation> locs;
+    try {
+       locs = t.getRegionsInRange(scan.getStartRow(), scan.getStopRow());
+    } finally {
+      t.close();
+    }
+    LOG.debug("Found "+locs.size()+" region(s).");
+    if (locs.size() == 0) return;
+
+    Map<HRegionLocation, Queue<Scan>> tasks = new HashMap<HRegionLocation, 
Queue<Scan>>(locs.size());
+    int i=0;
+    // organize region locations by region server
+    for (HRegionLocation loc : locs) {
+      Scan s = new Scan(scan);
+      s.setStartRow(i==0?scan.getStartRow() : 
loc.getRegionInfo().getStartKey());
+      i++;
+      s.setStopRow(i==locs.size()?scan.getStopRow() : 
loc.getRegionInfo().getEndKey());
+      addToMapOfQueues(tasks, loc, s);
+    }
+
+    int threads = parallelScaling>1.0 ?(int)Math.ceil(parallelScaling) : 
(int)Math.ceil(locs.size() * parallelScaling);
+    
+    createDefaultPool(threads);
+    SingleReaderMultiWriterQueue rw = new 
SingleReaderMultiWriterQueue(Math.max(scan.getCaching() * threads, threads), 
locs.size());
+    this.reader = rw;
+
+    LOG.debug("Scheduling "+threads+" thread(s).");
+    // round robin among region servers
+    int taskNb=1;
+    while(!tasks.isEmpty()) {
+      for (Iterator<Map.Entry<HRegionLocation, Queue<Scan>>> it = 
tasks.entrySet().iterator(); it.hasNext();) {
+        Scan next = it.next().getValue().poll();
+        if (next == null) {
+          it.remove();
+        } else {
+          pool.submit(new ScanTask(next, connection, tableName, rw, taskNb));
+          taskNb++;
+        }
+      }
+    }
+  }
+
+  private <K, V> Queue<V> addToMapOfQueues(Map<K, Queue<V>> map, K key, V 
value) {
+    Queue<V> values = map.get(key);
+    if (values == null) {
+      values = new ArrayDeque<V>();
+      map.put(key, values);
+    }
+    values.add(value);
+    return values;
+  }
+
+  protected void createDefaultPool(int threads) {
+    pool = new ThreadPoolExecutor(threads, threads,
+        60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        Threads.newDaemonThreadFactory("traf-parallel-scan"));
+    ((ThreadPoolExecutor)pool).prestartAllCoreThreads();
+  }
+
+  /* actual scanner methods */
+  @Override
+  public Result next() throws IOException {
+    try {
+      return reader.take();
+    } catch (IOException iox) {
+      // kill other threads
+      close();
+      // rethrow
+      throw iox;
+    }
+  }
+
+  /**
+   * close the parallel scanner, callers are strongly encouraged to call this 
method
+   * doesn't wait until the threadpool actually closes
+   */
+  @Override
+  public void close() {
+    // interrupt all running threads, don't wait for completion
+    pool.shutdownNow();
+  }
+#ifndef CDH1.0
+  @Override
+  public boolean renewLease() {return false;}//fake, never needed.
+#endif
+  ///// Helper classes and interfaces /////
+
+  // reader interface
+  private static interface ResultReader {
+    Result take() throws IOException;
+  }
+  // writer interface
+  private static interface ResultWriter {
+    // write a new result
+    void put(Result e) throws InterruptedException;
+    // the writer encountered an exception pass on to reader
+    void exception(Exception x);
+    // a writer thread is done, must be called by each thread
+    // that is writing to this ResultWriter
+    void done();
+  }
+
+  // a single reader, single writer queue, that reads Results as soon as they 
become available
+  private static class SingleReaderMultiWriterQueue implements ResultReader, 
ResultWriter {
+    private BlockingDeque<Result> queue;
+    private int taskCount;
+    public SingleReaderMultiWriterQueue(int capacity, int taskCount) {
+      queue = new LinkedBlockingDeque<Result>(capacity);
+      this.taskCount = taskCount;
+    }
+
+    // writer --
+    @Override
+    public void put(Result e) throws InterruptedException {
+      queue.put(e);
+    }
+
+    @Override
+    public void exception(Exception x) {
+      try {
+        queue.put(new DoneMarker(x));
+      } catch (InterruptedException ix) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    @Override
+    public void done() {
+      try {
+        queue.put(MARKER);
+      } catch (InterruptedException ix) {
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    // reader --
+    // pop the next Result as it becomes available
+    @Override
+    public Result take() throws IOException {
+      while(true) {
+        Result r;
+        if (taskCount > 0) {
+          try {
+            // block if there are still tasks running
+            r = queue.take();
+          } catch (InterruptedException x) {
+            throw new IOException(x);
+          }
+          if (r instanceof DoneMarker) {
+            ((DoneMarker)r).rethrow();
+            taskCount--;
+            continue;
+          }
+        } else {
+          r = queue.poll();
+        }
+        return r;
+      }
+    }
+  }
+
+  /* marker class, also used to pass Exception along */
+  protected static class DoneMarker extends Result {
+    private Exception x = null;
+    public DoneMarker() {}
+
+    public DoneMarker(Exception x) {
+      this.x = x;
+    }
+    public void rethrow() throws IOException {
+      if (x != null)
+        throw x instanceof IOException ? (IOException)x : new IOException(x);
+    }
+  }
+
+  /* Task executing a scan request */
+  protected class ScanTask implements Runnable {
+    private Scan s;
+    private ResultWriter writer;
+    private HConnection connection;
+    private TableName tableName;
+    private int taskNb;
+    public ScanTask(Scan s, HConnection connection, TableName tableName, 
ResultWriter writer, int taskNb) {
+      this.s = s;
+      this.connection = connection;
+      this.tableName = tableName;
+      this.writer = writer;
+      this.taskNb = taskNb;
+    }
+
+    @Override
+    public void run() {
+      Thread current = Thread.currentThread();
+      try {
+        // for HBase 1.x  
+        //ClientScanner98 scanner = new 
ClientScanner98(connection.getConfiguration(), s, tableName, 
(ClusterConnection)connection,
+        //       
((ClusterConnection)connection).getNewRpcRetryingCallerFactory(config),
+        //         RpcControllerFactory.instantiate(config));
+//        ClientScanner scanner = new 
ClientScanner(connection.getConfiguration(), s, tableName, connection);
+        ClientScanner98 scanner = new ClientScanner98(config,  s, tableName, 
connection);
+  
+        Result s;
+        while((s = scanner.next()) != null && !current.isInterrupted()) {
+          writer.put(s);
+        }
+        scanner.close();
+        writer.done();
+      } catch (Exception x) {
+        // record any exceptions encountered
+        writer.exception(x);
+
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
index b7481b9..a0e92ff 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
@@ -423,9 +423,9 @@ public class RMInterface {
     {
         return ttable.getTableName();
     }
-    public ResultScanner getScanner(Scan scan) throws IOException
+    public ResultScanner getScanner(Scan scan, float dopParallelScanner) 
throws IOException
     {
-        return ttable.getScanner(scan);
+        return ttable.getScanner(scan, dopParallelScanner);
     }
     public Result get(Get g) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
index 40b2eac..31e6d1b 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/SsccTransactionalTable.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TrafParallelClientScanner;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
@@ -732,9 +733,12 @@ if (LOG.isTraceEnabled()) LOG.trace("checkAndPut, seting 
request startid: " + tr
     {
         return super.getTableName();
     }
-    public ResultScanner getScanner(Scan scan) throws IOException
+    public ResultScanner getScanner(Scan scan, float DOPparallelScanner) 
throws IOException
     {
-        return super.getScanner(scan);
+        if (scan.isSmall() || DOPparallelScanner == 0)
+            return super.getScanner(scan);
+        else
+            return new TrafParallelClientScanner(this.connection, scan, 
getName(), DOPparallelScanner);       
     }
     public Result get(Get g) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
index 609d9b6..b295171 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TrafParallelClientScanner;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos;
 import 
org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.CheckAndDeleteRequest;
@@ -806,9 +807,12 @@ public HRegionLocation getRegionLocation(byte[] row, 
boolean f)
     {
         return super.getTableName();
     }
-    public ResultScanner getScanner(Scan scan) throws IOException
+    public ResultScanner getScanner(Scan scan, float DOPparallelScanner) 
throws IOException
     {
-        return super.getScanner(scan);
+        if (scan.isSmall() || DOPparallelScanner == 0)
+            return super.getScanner(scan);
+        else
+            return new TrafParallelClientScanner(this.connection, scan, 
getName(), DOPparallelScanner);       
     }
     public Result get(Get g) throws IOException
     {

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java
----------------------------------------------------------------------
diff --git 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java
 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java
index 187a15f..76e61d6 100644
--- 
a/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java
+++ 
b/core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/TransactionalTableClient.java
@@ -136,9 +136,8 @@ public interface  TransactionalTableClient  {
 
     byte[] getTableName();
 
-    ResultScanner getScanner(Scan scan) throws IOException;
+    ResultScanner getScanner(Scan scan, float DOPparallelScanner) throws 
IOException;
     Result get(Get g) throws IOException; 
-
     
     Result[] get( List<Get> g) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/comexe/ComTdbHbaseAccess.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHbaseAccess.h 
b/core/sql/comexe/ComTdbHbaseAccess.h
index c38f2f4..c785a8d 100644
--- a/core/sql/comexe/ComTdbHbaseAccess.h
+++ b/core/sql/comexe/ComTdbHbaseAccess.h
@@ -288,6 +288,9 @@ public:
     void setMaxNumRowsPerHbaseBlock(UInt32 n) { maxNumRowsPerHBaseBlock_ = n;}
     UInt32 maxNumRowsPerHbaseBlock() { return maxNumRowsPerHBaseBlock_; }
 
+    void setDopParallelScanner(Float32 f) { dopParallelScanner_ = f;}
+    Float32 dopParallelScanner() { return dopParallelScanner_; }
+
   private:
     enum
     {
@@ -301,6 +304,7 @@ public:
     UInt32 flags_;
     UInt32 numCacheRows_;
     UInt32 maxNumRowsPerHBaseBlock_;
+    Float32 dopParallelScanner_;
   };
 
   // ---------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/executor/ExHbaseIUD.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseIUD.cpp b/core/sql/executor/ExHbaseIUD.cpp
index 262505b..bf3103a 100644
--- a/core/sql/executor/ExHbaseIUD.cpp
+++ b/core/sql/executor/ExHbaseIUD.cpp
@@ -2786,7 +2786,8 @@ ExWorkProcRetcode ExHbaseUMDtrafSubsetTaskTcb::work(short 
&rc)
                                           
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(),
                                           
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(),
                                           
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(),
-                                          FALSE, NULL, NULL, NULL);
+                                          FALSE, NULL, NULL, NULL,
+                          
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner());
            if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
              step_ = HANDLE_ERROR;
            else
@@ -3214,7 +3215,8 @@ ExWorkProcRetcode 
ExHbaseUMDnativeSubsetTaskTcb::work(short &rc)
                                           
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(),
                                           
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(),
                                           
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(),
-                                          FALSE, NULL, NULL, NULL);
+                                          FALSE, NULL, NULL, NULL,
+                          
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner());
            if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
              step_ = HANDLE_ERROR;
            else

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/executor/ExHbaseSelect.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseSelect.cpp 
b/core/sql/executor/ExHbaseSelect.cpp
index 2bd904b..f7d1ba7 100644
--- a/core/sql/executor/ExHbaseSelect.cpp
+++ b/core/sql/executor/ExHbaseSelect.cpp
@@ -90,6 +90,7 @@ ExWorkProcRetcode ExHbaseScanTaskTcb::work(short &rc)
                                            &tcb_->hbaseFilterOps_ : NULL),
                                           (tcb_->hbaseFilterValues_.entries() 
> 0 ?
                                            &tcb_->hbaseFilterValues_ : NULL),
+                       
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(),
                                           tcb_->getSamplePercentage(),
                                            FALSE, 0, NULL, NULL, 0,
                                            
(tcb_->hbaseAccessTdb().getHbaseAccessOptions() 
@@ -275,6 +276,7 @@ ExWorkProcRetcode ExHbaseScanRowwiseTaskTcb::work(short &rc)
                                            &tcb_->hbaseFilterOps_ : NULL),
                                           (tcb_->hbaseFilterValues_.entries() 
> 0 ?
                                            &tcb_->hbaseFilterValues_ : NULL),
+                       
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(),
                                           tcb_->getSamplePercentage(),
                                            FALSE, 0, NULL, NULL, 0,
                                            
(tcb_->hbaseAccessTdb().getHbaseAccessOptions() 
@@ -492,6 +494,7 @@ ExWorkProcRetcode ExHbaseScanSQTaskTcb::work(short &rc)
                                            &tcb_->hbaseFilterOps_ : NULL),
                                           (tcb_->hbaseFilterValues_.entries() 
> 0 ?
                                            &tcb_->hbaseFilterValues_ : NULL),
+                       
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(),
                                            tcb_->getSamplePercentage(),
                                            
tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getUseSnapshotScan(),
                                            
tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getSnapshotScanTimeout(),

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/executor/HBaseClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HBaseClient_JNI.cpp 
b/core/sql/executor/HBaseClient_JNI.cpp
index f0d50ab..1c25123 100644
--- a/core/sql/executor/HBaseClient_JNI.cpp
+++ b/core/sql/executor/HBaseClient_JNI.cpp
@@ -3585,7 +3585,7 @@ HTC_RetCode HTableClient_JNI::init()
     JavaMethods_[JM_GET_ERROR  ].jm_name      = "getLastError";
     JavaMethods_[JM_GET_ERROR  ].jm_signature = "()Ljava/lang/String;";
     JavaMethods_[JM_SCAN_OPEN  ].jm_name      = "startScan";
-    JavaMethods_[JM_SCAN_OPEN  ].jm_signature = 
"(J[B[B[Ljava/lang/Object;JZZI[Ljava/lang/Object;[Ljava/lang/Object;[Ljava/lang/Object;FZZILjava/lang/String;Ljava/lang/String;II)Z";
+    JavaMethods_[JM_SCAN_OPEN  ].jm_signature = 
"(J[B[B[Ljava/lang/Object;JZZI[Ljava/lang/Object;[Ljava/lang/Object;[Ljava/lang/Object;FFZZILjava/lang/String;Ljava/lang/String;II)Z";
     JavaMethods_[JM_DELETE     ].jm_name      = "deleteRow";
     JavaMethods_[JM_DELETE     ].jm_signature = "(J[B[Ljava/lang/Object;JZ)Z";
     JavaMethods_[JM_COPROC_AGGR     ].jm_name      = "coProcAggr";
@@ -3631,6 +3631,7 @@ HTC_RetCode HTableClient_JNI::startScan(Int64 transID, 
const Text& startRowID,
                                        const LIST(NAString) 
*inColNamesToFilter, 
                                        const LIST(NAString) *inCompareOpList,
                                        const LIST(NAString) 
*inColValuesToCompare,
+                    Float32 dopParallelScanner,
                                        Float32 samplePercent,
                                        NABoolean useSnapshotScan,
                                        Lng32 snapTimeout,
@@ -3754,6 +3755,7 @@ HTC_RetCode HTableClient_JNI::startScan(Int64 transID, 
const Text& startRowID,
         return HTC_ERROR_SCANOPEN_PARAM;
      }
   }
+  jfloat j_dopParallelScanner = dopParallelScanner;
   jfloat j_smplPct = samplePercent;
   jboolean j_useSnapshotScan = useSnapshotScan;
   jint j_snapTimeout = snapTimeout;
@@ -3786,6 +3788,7 @@ HTC_RetCode HTableClient_JNI::startScan(Int64 transID, 
const Text& startRowID,
                                               
JavaMethods_[JM_SCAN_OPEN].methodID, 
                                               j_tid, jba_startRowID, 
jba_stopRowID, j_cols, j_ts, j_cb, j_smallScanner, j_ncr,
                                               j_colnamestofilter, 
j_compareoplist, j_colvaluestocompare, 
+                                              j_dopParallelScanner,
                                               j_smplPct, j_preFetch, 
j_useSnapshotScan,
                                               j_snapTimeout, js_snapName, 
js_tmp_loc, j_espNum,
                                               j_versions);

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/executor/HBaseClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HBaseClient_JNI.h 
b/core/sql/executor/HBaseClient_JNI.h
index 9315024..373098c 100644
--- a/core/sql/executor/HBaseClient_JNI.h
+++ b/core/sql/executor/HBaseClient_JNI.h
@@ -249,6 +249,7 @@ public:
                        const LIST(NAString) *inColNamesToFilter, 
                        const LIST(NAString) *inCompareOpList,
                        const LIST(NAString) *inColValuesToCompare,
+            Float32 dopParallelScanner = 0.0f,
                        Float32 samplePercent = -1.0f,
                        NABoolean useSnapshotScan = FALSE,
                        Lng32 snapTimeout = 0,

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/exp/ExpHbaseInterface.cpp
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpHbaseInterface.cpp 
b/core/sql/exp/ExpHbaseInterface.cpp
index c7914d0..7fa7d53 100644
--- a/core/sql/exp/ExpHbaseInterface.cpp
+++ b/core/sql/exp/ExpHbaseInterface.cpp
@@ -629,13 +629,14 @@ Lng32 ExpHbaseInterface_JNI::scanOpen(
                                      const LIST(NAString) *inColNamesToFilter,
                                      const LIST(NAString) *inCompareOpList,
                                      const LIST(NAString) 
*inColValuesToCompare,
+                         Float32 dopParallelScanner,
                                      Float32 samplePercent,
                                      NABoolean useSnapshotScan,
                                      Lng32 snapTimeout,
                                      char * snapName,
                                      char * tmpLoc,
                                      Lng32 espNum,
-                                      Lng32 versions)
+                      Lng32 versions)
 {
   htc_ = client_->getHTableClient((NAHeap *)heap_, tblName.val, useTRex_, 
hbs_);
   if (htc_ == NULL)
@@ -659,6 +660,7 @@ Lng32 ExpHbaseInterface_JNI::scanOpen(
                              inColNamesToFilter,
                              inCompareOpList,
                              inColValuesToCompare,
+                             dopParallelScanner,
                              samplePercent,
                              useSnapshotScan,
                              snapTimeout,

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/exp/ExpHbaseInterface.h
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpHbaseInterface.h b/core/sql/exp/ExpHbaseInterface.h
index f711bcf..83d6c55 100644
--- a/core/sql/exp/ExpHbaseInterface.h
+++ b/core/sql/exp/ExpHbaseInterface.h
@@ -158,6 +158,7 @@ class ExpHbaseInterface : public NABasicObject
                         const LIST(NAString) *inColNamesToFilter, 
                         const LIST(NAString) *inCompareOpList,
                         const LIST(NAString) *inColValuesToCompare,
+                Float32 dopParallelScanner = 0.0f,
                         Float32 samplePercent = -1.0f,
                         NABoolean useSnapshotScan = FALSE,
                         Lng32 snapTimeout = 0,
@@ -470,6 +471,7 @@ class ExpHbaseInterface_JNI : public ExpHbaseInterface
                         const LIST(NAString) *inColNamesToFilter, 
                         const LIST(NAString) *inCompareOpList,
                         const LIST(NAString) *inColValuesToCompare,
+                Float32 DOPparallelScanner = 0.0f,
                         Float32 samplePercent = -1.0f,
                         NABoolean useSnapshotScan = FALSE,
                         Lng32 snapTimeout = 0,

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/generator/GenExplain.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenExplain.cpp 
b/core/sql/generator/GenExplain.cpp
index e419dca..42a29df 100644
--- a/core/sql/generator/GenExplain.cpp
+++ b/core/sql/generator/GenExplain.cpp
@@ -956,6 +956,13 @@ HbaseAccess::addSpecificExplainInfo(ExplainTupleMaster 
*explainTuple,
   }
 
   char buf[20];
+
+  if ((((ComTdbHbaseAccess 
*)tdb)->getHbasePerfAttributes()->dopParallelScanner())>0.0) {
+     description += "parallel_scanner: " ;
+     sprintf(buf, "%g ", ((ComTdbHbaseAccess 
*)tdb)->getHbasePerfAttributes()->dopParallelScanner());
+     description += buf;
+  }
+
   if ( getProbes().getValue() > 0.0 ) {
     description += "probes: "; // total number of probes
     sprintf(buf, "%g ", getProbes().getValue());

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/generator/GenRelScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenRelScan.cpp 
b/core/sql/generator/GenRelScan.cpp
index b56a7ae..e1b9468 100644
--- a/core/sql/generator/GenRelScan.cpp
+++ b/core/sql/generator/GenRelScan.cpp
@@ -2815,6 +2815,7 @@ short HbaseAccess::codeGen(Generator * generator)
                                 hbaseBlockSize,
                                 hbpa);
 
+  generator->setHBaseParallelScanner(hbpa);
 
 
   ComTdbHbaseAccess::HbaseAccessOptions * hbo = NULL;

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/generator/Generator.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/Generator.cpp b/core/sql/generator/Generator.cpp
index 24b7eb9..f595192 100644
--- a/core/sql/generator/Generator.cpp
+++ b/core/sql/generator/Generator.cpp
@@ -3158,4 +3158,7 @@ void Generator::setHBaseSmallScanner(Int32 hbaseRowSize, 
double estRowsAccessed,
   hbpa->setMaxNumRowsPerHbaseBlock(hbaseBlockSize/hbaseRowSize);
 }
 
+void Generator::setHBaseParallelScanner(ComTdbHbaseAccess::HbasePerfAttributes 
* hbpa){
+    
hbpa->setDopParallelScanner(CmpCommon::getDefaultNumeric(HBASE_DOP_PARALLEL_SCANNER));
+}
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/generator/Generator.h
----------------------------------------------------------------------
diff --git a/core/sql/generator/Generator.h b/core/sql/generator/Generator.h
index a057d33..7cd02d6 100644
--- a/core/sql/generator/Generator.h
+++ b/core/sql/generator/Generator.h
@@ -1641,7 +1641,7 @@ public:
                            ComTdbHbaseAccess::HbasePerfAttributes * hbpa);
   void setHBaseSmallScanner(Int32 hbaseRowSize, double rowsAccessed,
                           Int32 hbaseBlockSize, 
ComTdbHbaseAccess::HbasePerfAttributes * hbpa);
-
+  void setHBaseParallelScanner(ComTdbHbaseAccess::HbasePerfAttributes * hbpa);
 
   NASet<Int64> &objectUids() { return objectUids_; }
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/regress/executor/EXPECTED140
----------------------------------------------------------------------
diff --git a/core/sql/regress/executor/EXPECTED140 
b/core/sql/regress/executor/EXPECTED140
index c82aabd..00d157c 100644
--- a/core/sql/regress/executor/EXPECTED140
+++ b/core/sql/regress/executor/EXPECTED140
@@ -45,6 +45,15 @@
 
 --- 1 row(s) updated.
 >>
+>>upsert using load 
++>into t140b
++>  select  x1,x1,10*x1,100*x1,1000*x1,10*x1+1,100*x1+1, 1000*x1+1, 
'aaa'||cast(x1 as varchar(10)) 
++>
++>  from t140helper
++>transpose 0,1,2,3,4,5,6,7,8,9 as x1;
+
+--- 10 row(s) inserted.
+>>
 >>obey TEST140(run);
 >>-- test returned rows with or without adding key column and test of all 
 >>pushdown functions with null or non null column
 >>-- only one column retrieved
@@ -53,7 +62,7 @@
 ------------------------------------------------------------------ PLAN SUMMARY
 MODULE_NAME .............. DYNAMICALLY COMPILED
 STATEMENT_NAME ........... NOT NAMED
-PLAN_ID .................. 212321974183907757
+PLAN_ID .................. 212326821484634327
 ROWS_OUT ................ 33
 EST_TOTAL_COST ........... 0.05
 STATEMENT ................ select a from t140 where b>500;
@@ -106,7 +115,7 @@ DESCRIPTION
   DDL_TRANSACTIONS ....... ON
   SCHEMA ................. TRAFODION.SCH
   GENERATE_EXPLAIN ....... ON
-  ObjectUIDs ............. 7577396214768470203
+  ObjectUIDs ............. 2948826545036618858
   select_list ............ TRAFODION.SCH.T140.A
 
 
@@ -154,7 +163,7 @@ A
 ------------------------------------------------------------------ PLAN SUMMARY
 MODULE_NAME .............. DYNAMICALLY COMPILED
 STATEMENT_NAME ........... NOT NAMED
-PLAN_ID .................. 212321974185251743
+PLAN_ID .................. 212326821484786794
 ROWS_OUT ................ 33
 EST_TOTAL_COST ........... 0.05
 STATEMENT ................ select an from t140b where b<=200;
@@ -207,7 +216,7 @@ DESCRIPTION
   DDL_TRANSACTIONS ....... ON
   SCHEMA ................. TRAFODION.SCH
   GENERATE_EXPLAIN ....... ON
-  ObjectUIDs ............. 7577396214768470287
+  ObjectUIDs ............. 2948826545036618945
   select_list ............ TRAFODION.SCH.T140B.AN
 
 
@@ -242,7 +251,7 @@ DESCRIPTION
 ------------------------------------------------------------------ PLAN SUMMARY
 MODULE_NAME .............. DYNAMICALLY COMPILED
 STATEMENT_NAME ........... NOT NAMED
-PLAN_ID .................. 212321974185404440
+PLAN_ID .................. 212326821484919166
 ROWS_OUT ................ 33
 EST_TOTAL_COST ........... 0.05
 STATEMENT ................ select an from t140 where b<=200;
@@ -295,7 +304,7 @@ DESCRIPTION
   DDL_TRANSACTIONS ....... ON
   SCHEMA ................. TRAFODION.SCH
   GENERATE_EXPLAIN ....... ON
-  ObjectUIDs ............. 7577396214768470203
+  ObjectUIDs ............. 2948826545036618858
   select_list ............ TRAFODION.SCH.T140.AN
 
 
@@ -341,7 +350,7 @@ AN
 ------------------------------------------------------------------ PLAN SUMMARY
 MODULE_NAME .............. DYNAMICALLY COMPILED
 STATEMENT_NAME ........... NOT NAMED
-PLAN_ID .................. 212321974185568983
+PLAN_ID .................. 212326821485075357
 ROWS_OUT ................ 10
 EST_TOTAL_COST ........... 0.05
 STATEMENT ................ select an from t140 where b=200 and an is not null;
@@ -394,7 +403,7 @@ DESCRIPTION
   DDL_TRANSACTIONS ....... ON
   SCHEMA ................. TRAFODION.SCH
   GENERATE_EXPLAIN ....... ON
-  ObjectUIDs ............. 7577396214768470203
+  ObjectUIDs ............. 2948826545036618858
   select_list ............ TRAFODION.SCH.T140.AN
   input_variables ........ %(200)
 
@@ -438,7 +447,7 @@ AN
 ------------------------------------------------------------------ PLAN SUMMARY
 MODULE_NAME .............. DYNAMICALLY COMPILED
 STATEMENT_NAME ........... NOT NAMED
-PLAN_ID .................. 212321974185775205
+PLAN_ID .................. 212326821485216670
 ROWS_OUT ................ 67
 EST_TOTAL_COST ........... 0.05
 STATEMENT ................ select an, a from t140 where b!=500;
@@ -491,7 +500,7 @@ DESCRIPTION
   DDL_TRANSACTIONS ....... ON
   SCHEMA ................. TRAFODION.SCH
   GENERATE_EXPLAIN ....... ON
-  ObjectUIDs ............. 7577396214768470203
+  ObjectUIDs ............. 2948826545036618858
   select_list ............ TRAFODION.SCH.T140.AN, TRAFODION.SCH.T140.A
 
 
@@ -571,7 +580,7 @@ AN
 ------------------------------------------------------------------ PLAN SUMMARY
 MODULE_NAME .............. DYNAMICALLY COMPILED
 STATEMENT_NAME ........... NOT NAMED
-PLAN_ID .................. 212321974185993581
+PLAN_ID .................. 212326821485412711
 ROWS_OUT ................ 10
 EST_TOTAL_COST ........... 0.05
 STATEMENT ................ select an from t140 where bn=201 and an is not null;
@@ -624,7 +633,7 @@ DESCRIPTION
   DDL_TRANSACTIONS ....... ON
   SCHEMA ................. TRAFODION.SCH
   GENERATE_EXPLAIN ....... ON
-  ObjectUIDs ............. 7577396214768470203
+  ObjectUIDs ............. 2948826545036618858
   select_list ............ TRAFODION.SCH.T140.AN
   input_variables ........ %(201)
 
@@ -667,7 +676,7 @@ AN
 ------------------------------------------------------------------ PLAN SUMMARY
 MODULE_NAME .............. DYNAMICALLY COMPILED
 STATEMENT_NAME ........... NOT NAMED
-PLAN_ID .................. 212321974186137176
+PLAN_ID .................. 212326821485540922
 ROWS_OUT ................ 67
 EST_TOTAL_COST ........... 0.05
 STATEMENT ................ select an, a from t140 where bn!=501;
@@ -720,7 +729,7 @@ DESCRIPTION
   DDL_TRANSACTIONS ....... ON
   SCHEMA ................. TRAFODION.SCH
   GENERATE_EXPLAIN ....... ON
-  ObjectUIDs ............. 7577396214768470203
+  ObjectUIDs ............. 2948826545036618858
   select_list ............ TRAFODION.SCH.T140.AN, TRAFODION.SCH.T140.A
 
 
@@ -815,7 +824,7 @@ A
 ------------------------------------------------------------------ PLAN SUMMARY
 MODULE_NAME .............. DYNAMICALLY COMPILED
 STATEMENT_NAME ........... NOT NAMED
-PLAN_ID .................. 212321974186325095
+PLAN_ID .................. 212326821485768519
 ROWS_OUT ................ 11
 EST_TOTAL_COST ........... 0.05
 STATEMENT ................ select an from t140 where an between 20 and 40;
@@ -868,7 +877,7 @@ DESCRIPTION
   DDL_TRANSACTIONS ....... ON
   SCHEMA ................. TRAFODION.SCH
   GENERATE_EXPLAIN ....... ON
-  ObjectUIDs ............. 7577396214768470203
+  ObjectUIDs ............. 2948826545036618858
   select_list ............ TRAFODION.SCH.T140.AN
 
 
@@ -918,6 +927,400 @@ AN
 
 --- 4 row(s) selected.
 >>
+>>-- test parallel scanner
+>>-- turn off small scanner as it will force single scanner
+>>cqd hbase_small_scanner 'OFF';
+
+--- SQL operation complete.
+>>cqd parallel_num_esps '1';
+
+--- SQL operation complete.
+>>-- force 2 threads
+>>cqd hbase_dop_parallel_scanner '2.0';
+
+--- SQL operation complete.
+>>explain select avg(a) from t140b;
+
+------------------------------------------------------------------ PLAN SUMMARY
+MODULE_NAME .............. DYNAMICALLY COMPILED
+STATEMENT_NAME ........... NOT NAMED
+PLAN_ID .................. 212326821485986085
+ROWS_OUT ................. 1
+EST_TOTAL_COST ........... 0.05
+STATEMENT ................ select avg(a) from t140b;
+
+
+------------------------------------------------------------------ NODE LISTING
+ROOT ======================================  SEQ_NO 3        ONLY CHILD 2
+REQUESTS_IN .............. 1
+ROWS_OUT ................. 1
+EST_OPER_COST ............ 0
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est ........... 1
+  fragment_id ............ 0
+  parent_frag ............ (none)
+  fragment_type .......... master
+  statement_index ........ 0
+  affinity_value ......... 0
+  max_max_cardinality    100
+  total_overflow_size .... 0.00 KB
+  xn_access_mode ......... read_only
+  xn_autoabort_interval    0
+  auto_query_retry ....... enabled
+  plan_version ....... 2,600
+  embedded_arkcmp ........ used
+  IS_SQLCI ............... ON
+  LDAP_USERNAME
+  TARGET_CODE ............ DEBUG
+  TARGET_CPU_FREQUENCY   199
+  TARGET_IO_SEEK_TIME .... 0.02
+  TARGET_IO_SEQ_READ_RATE  2.5
+  TARGET_MSG_LOCAL_RATE   10
+  TARGET_MSG_LOCAL_TIME    0.01
+  TARGET_MSG_REMOTE_RAT  100
+  TARGET_MSG_REMOTE_TIME   0.01
+  ARKCMP_FAKE_HW ......... ON
+  SKIP_METADATA_VIEWS .... ON
+  DEF_NUM_SMP_CPUS ....... 2
+  MAX_ESPS_PER_CPU_PER_OP  1
+  DEF_NUM_NODES_IN_ACTIVE  1
+  POS_ALLOW_NON_PK_TABLES  ON
+  MODE_SEABASE ........... ON
+  SEABASE_VOLATILE_TABLES  ON
+  HBASE_ASYNC_DROP_TABLE   OFF
+  HBASE_SERIALIZATION .... ON
+  HBASE_FILTER_PREDS ..... 2
+  TRAF_ALIGNED_ROW_FORMAT  OFF
+  TRAF_INDEX_CREATE_OPT    ON
+  DDL_TRANSACTIONS ....... ON
+  SCHEMA ................. TRAFODION.SCH
+  HBASE_SMALL_SCANNER .... OFF
+  PARALLEL_NUM_ESPS ...... 1
+  HBASE_DOP_PARALLEL_SCAN  2
+  GENERATE_EXPLAIN ....... ON
+  ObjectUIDs ............. 2948826545036618945
+  select_list ............ cast(cast((cast((cast((cast(sum(TRAFODION.SCH.T140B.
+                             A)) * 10000 ...0)) / cast(count(1 )))) / 10000
+                             ...0)))
+
+
+SORT_SCALAR_AGGR ==========================  SEQ_NO 2        ONLY CHILD 1
+REQUESTS_IN .............. 1
+ROWS_OUT ................. 1
+EST_OPER_COST ............ 0.01
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est ........... 1
+  fragment_id ............ 0
+  parent_frag ............ (none)
+  fragment_type .......... master
+  aggregates ............. sum(TRAFODION.SCH.T140B.A), count(1 )
+
+
+TRAFODION_SCAN ============================  SEQ_NO 1        NO CHILDREN
+TABLE_NAME ............... T140B
+REQUESTS_IN .............. 1
+ROWS_OUT ............... 100
+EST_OPER_COST ............ 0.05
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est ......... 100
+  fragment_id ............ 0
+  parent_frag ............ (none)
+  fragment_type .......... master
+  scan_type .............. subset scan of table TRAFODION.SCH.T140B
+  object_type ............ Trafodion
+  columns ................ all
+  begin_keys(incl)
+  end_keys(incl)
+  cache_size ........... 100
+  parallel_scanner ....... 2
+  probes ................. 1
+  rows_accessed ........ 100
+  column_retrieved ....... #1:3
+  key_columns ............ _SALT_, UNIQ, UNIQ2
+
+--- SQL operation complete.
+>>select avg(a) from t140b;
+
+(EXPR)              
+--------------------
+
+                  45
+
+--- 1 row(s) selected.
+>>-- force 100% of threads (with 2 partition this is 2 threads)
+>>cqd hbase_dop_parallel_scanner '1.0';
+
+--- SQL operation complete.
+>>explain select avg(a) from t140b;
+
+------------------------------------------------------------------ PLAN SUMMARY
+MODULE_NAME .............. DYNAMICALLY COMPILED
+STATEMENT_NAME ........... NOT NAMED
+PLAN_ID .................. 212326821486288071
+ROWS_OUT ................. 1
+EST_TOTAL_COST ........... 0.05
+STATEMENT ................ select avg(a) from t140b;
+
+
+------------------------------------------------------------------ NODE LISTING
+ROOT ======================================  SEQ_NO 3        ONLY CHILD 2
+REQUESTS_IN .............. 1
+ROWS_OUT ................. 1
+EST_OPER_COST ............ 0
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est ........... 1
+  fragment_id ............ 0
+  parent_frag ............ (none)
+  fragment_type .......... master
+  statement_index ........ 0
+  affinity_value ......... 0
+  max_max_cardinality    100
+  total_overflow_size .... 0.00 KB
+  xn_access_mode ......... read_only
+  xn_autoabort_interval    0
+  auto_query_retry ....... enabled
+  plan_version ....... 2,600
+  embedded_arkcmp ........ used
+  IS_SQLCI ............... ON
+  LDAP_USERNAME
+  TARGET_CODE ............ DEBUG
+  TARGET_CPU_FREQUENCY   199
+  TARGET_IO_SEEK_TIME .... 0.02
+  TARGET_IO_SEQ_READ_RATE  2.5
+  TARGET_MSG_LOCAL_RATE   10
+  TARGET_MSG_LOCAL_TIME    0.01
+  TARGET_MSG_REMOTE_RAT  100
+  TARGET_MSG_REMOTE_TIME   0.01
+  ARKCMP_FAKE_HW ......... ON
+  SKIP_METADATA_VIEWS .... ON
+  DEF_NUM_SMP_CPUS ....... 2
+  MAX_ESPS_PER_CPU_PER_OP  1
+  DEF_NUM_NODES_IN_ACTIVE  1
+  POS_ALLOW_NON_PK_TABLES  ON
+  MODE_SEABASE ........... ON
+  SEABASE_VOLATILE_TABLES  ON
+  HBASE_ASYNC_DROP_TABLE   OFF
+  HBASE_SERIALIZATION .... ON
+  HBASE_FILTER_PREDS ..... 2
+  TRAF_ALIGNED_ROW_FORMAT  OFF
+  TRAF_INDEX_CREATE_OPT    ON
+  DDL_TRANSACTIONS ....... ON
+  SCHEMA ................. TRAFODION.SCH
+  HBASE_SMALL_SCANNER .... OFF
+  PARALLEL_NUM_ESPS ...... 1
+  HBASE_DOP_PARALLEL_SCAN  1
+  GENERATE_EXPLAIN ....... ON
+  ObjectUIDs ............. 2948826545036618945
+  select_list ............ cast(cast((cast((cast((cast(sum(TRAFODION.SCH.T140B.
+                             A)) * 10000 ...0)) / cast(count(1 )))) / 10000
+                             ...0)))
+
+
+SORT_SCALAR_AGGR ==========================  SEQ_NO 2        ONLY CHILD 1
+REQUESTS_IN .............. 1
+ROWS_OUT ................. 1
+EST_OPER_COST ............ 0.01
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est ........... 1
+  fragment_id ............ 0
+  parent_frag ............ (none)
+  fragment_type .......... master
+  aggregates ............. sum(TRAFODION.SCH.T140B.A), count(1 )
+
+
+TRAFODION_SCAN ============================  SEQ_NO 1        NO CHILDREN
+TABLE_NAME ............... T140B
+REQUESTS_IN .............. 1
+ROWS_OUT ............... 100
+EST_OPER_COST ............ 0.05
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est ......... 100
+  fragment_id ............ 0
+  parent_frag ............ (none)
+  fragment_type .......... master
+  scan_type .............. subset scan of table TRAFODION.SCH.T140B
+  object_type ............ Trafodion
+  columns ................ all
+  begin_keys(incl)
+  end_keys(incl)
+  cache_size ........... 100
+  parallel_scanner ....... 1
+  probes ................. 1
+  rows_accessed ........ 100
+  column_retrieved ....... #1:3
+  key_columns ............ _SALT_, UNIQ, UNIQ2
+
+--- SQL operation complete.
+>>select avg(a) from t140b;
+
+(EXPR)              
+--------------------
+
+                  45
+
+--- 1 row(s) selected.
+>>-- reset to regular scanner
+>>cqd hbase_dop_parallel_scanner reset;
+
+--- SQL operation complete.
+>>cqd hbase_small_scanner reset;
+
+--- SQL operation complete.
+>>cqd parallel_num_esps reset;
+
+--- SQL operation complete.
+>>explain select avg(a) from t140b;
+
+------------------------------------------------------------------ PLAN SUMMARY
+MODULE_NAME .............. DYNAMICALLY COMPILED
+STATEMENT_NAME ........... NOT NAMED
+PLAN_ID .................. 212326821486475177
+ROWS_OUT ................. 1
+EST_TOTAL_COST ........... 0.05
+STATEMENT ................ select avg(a) from t140b;
+
+
+------------------------------------------------------------------ NODE LISTING
+ROOT ======================================  SEQ_NO 5        ONLY CHILD 4
+REQUESTS_IN .............. 1
+ROWS_OUT ................. 1
+EST_OPER_COST ............ 0
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est ........... 1
+  fragment_id ............ 0
+  parent_frag ............ (none)
+  fragment_type .......... master
+  statement_index ........ 0
+  affinity_value ......... 0
+  est_memory_per_cpu ..... 1 KB
+  max_max_cardinality    100
+  total_overflow_size .... 0.00 KB
+  esp_2_node_map ......... (\NSK:-1:-1)
+  xn_access_mode ......... read_only
+  xn_autoabort_interval    0
+  auto_query_retry ....... enabled
+  plan_version ....... 2,600
+  embedded_arkcmp ........ used
+  IS_SQLCI ............... ON
+  LDAP_USERNAME
+  TARGET_CODE ............ DEBUG
+  TARGET_CPU_FREQUENCY   199
+  TARGET_IO_SEEK_TIME .... 0.02
+  TARGET_IO_SEQ_READ_RATE  2.5
+  TARGET_MSG_LOCAL_RATE   10
+  TARGET_MSG_LOCAL_TIME    0.01
+  TARGET_MSG_REMOTE_RAT  100
+  TARGET_MSG_REMOTE_TIME   0.01
+  ARKCMP_FAKE_HW ......... ON
+  SKIP_METADATA_VIEWS .... ON
+  DEF_NUM_SMP_CPUS ....... 2
+  MAX_ESPS_PER_CPU_PER_OP  1
+  DEF_NUM_NODES_IN_ACTIVE  1
+  POS_ALLOW_NON_PK_TABLES  ON
+  MODE_SEABASE ........... ON
+  SEABASE_VOLATILE_TABLES  ON
+  HBASE_ASYNC_DROP_TABLE   OFF
+  HBASE_SERIALIZATION .... ON
+  HBASE_FILTER_PREDS ..... 2
+  TRAF_ALIGNED_ROW_FORMAT  OFF
+  TRAF_INDEX_CREATE_OPT    ON
+  DDL_TRANSACTIONS ....... ON
+  SCHEMA ................. TRAFODION.SCH
+  GENERATE_EXPLAIN ....... ON
+  ObjectUIDs ............. 2948826545036618945
+  select_list ............ cast(cast(cast((cast((cast((cast(sum(sum(TRAFODION.S
+                             CH.T140B.A))) * 10000 ...0)) / cast(sum(count(1
+                             ))))) / 10000 ...0))))
+
+
+SORT_PARTIAL_AGGR_ROOT ====================  SEQ_NO 4        ONLY CHILD 3
+REQUESTS_IN .............. 1
+ROWS_OUT ................. 1
+EST_OPER_COST ............ 0.01
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est ........... 1
+  fragment_id ............ 0
+  parent_frag ............ (none)
+  fragment_type .......... master
+  aggregates ............. sum(sum(TRAFODION.SCH.T140B.A)), sum(count(1 ))
+
+
+ESP_EXCHANGE ==============================  SEQ_NO 3        ONLY CHILD 2
+REQUESTS_IN .............. 1
+ROWS_OUT ................ 50
+EST_OPER_COST ............ 0.01
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est .......... 50
+  fragment_id ............ 2
+  parent_frag ............ 0
+  fragment_type .......... esp
+  est_memory_per_cpu ..... 1 KB
+  buffer_size ........ 6,250
+  record_length ......... 24
+  parent_processes ....... 1
+  child_processes ........ 2
+  child_partitioning_func  hash2 partitioned 2 ways on
+                             (TRAFODION.SCH.T140B.UNIQ,
+                             TRAFODION.SCH.T140B.UNIQ2)
+  seamonster_query ....... no
+  seamonster_exchange .... no
+
+
+SORT_PARTIAL_AGGR_LEAF ====================  SEQ_NO 2        ONLY CHILD 1
+REQUESTS_IN .............. 1
+ROWS_OUT ................ 50
+EST_OPER_COST ............ 0.01
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est .......... 50
+  fragment_id ............ 2
+  parent_frag ............ 0
+  fragment_type .......... esp
+  aggregates ............. sum(TRAFODION.SCH.T140B.A), count(1 )
+
+
+TRAFODION_SCAN ============================  SEQ_NO 1        NO CHILDREN
+TABLE_NAME ............... T140B
+REQUESTS_IN .............. 1
+ROWS_OUT ............... 100
+EST_OPER_COST ............ 0.05
+EST_TOTAL_COST ........... 0.05
+DESCRIPTION
+  max_card_est ......... 100
+  fragment_id ............ 2
+  parent_frag ............ 0
+  fragment_type .......... esp
+  scan_type .............. subset scan of table TRAFODION.SCH.T140B
+  object_type ............ Trafodion
+  cache_size ........... 100
+  probes ................. 1
+  rows_accessed ........ 100
+  column_retrieved ....... #1:3
+  key_columns ............ _SALT_, UNIQ, UNIQ2
+  begin_key .............. (_SALT_ = (\:_sys_HostVarLoHashPart Hash2Distrib
+                             2)), (UNIQ = <min>), (UNIQ2 = <min>)
+  end_key ................ (_SALT_ = (\:_sys_HostVarHiHashPart Hash2Distrib
+                             2)), (UNIQ = <max>), (UNIQ2 = <max>)
+
+--- SQL operation complete.
+>>select avg(a) from t140b;
+
+(EXPR)              
+--------------------
+
+                  45
+
+--- 1 row(s) selected.
+>>
 >>obey TEST140(clnup);
 >>drop table t140helper;
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/regress/executor/TEST140
----------------------------------------------------------------------
diff --git a/core/sql/regress/executor/TEST140 
b/core/sql/regress/executor/TEST140
index 1540522..424b12a 100644
--- a/core/sql/regress/executor/TEST140
+++ b/core/sql/regress/executor/TEST140
@@ -21,7 +21,7 @@
 --
 -- @@@ END COPYRIGHT @@@
 --
--- Functionality: Advanced predicate pushdown (V2)
+-- Functionality: Advanced predicate pushdown (V2) and parallelScanner
 -- Expected files: EXPECTED140
 -- Table created: t140 t140b t140helper
 -- Limitations:
@@ -63,6 +63,13 @@ insert into t140 values(14,14,42,402,4002,43,403,4003,NULL);
 insert into t140 values(15,15,52,502,5002,53,503,5003,'asd');
 update t140 set d = null where uniq = 15;
 
+upsert using load 
+into t140b
+  select  x1,x1,10*x1,100*x1,1000*x1,10*x1+1,100*x1+1, 1000*x1+1, 
'aaa'||cast(x1 as varchar(10)) 
+         
+  from t140helper
+transpose 0,1,2,3,4,5,6,7,8,9 as x1;
+
 ?section run
 -- test returned rows with or without adding key column and test of all 
pushdown functions with null or non null column
 -- only one column retrieved
@@ -94,3 +101,22 @@ explain select an from t140 where an between 20 and 40;
 select an from t140 where an between 20 and 40;
 select an from t140 where an in (21,41,51,61,10);
 
+-- test parallel scanner
+-- turn off small scanner as it will force single scanner
+cqd hbase_small_scanner 'OFF';
+cqd parallel_num_esps '1';
+-- force 2 threads
+cqd hbase_dop_parallel_scanner '2.0';
+explain select avg(a) from t140b;
+select avg(a) from t140b;
+-- force 100% of threads (with 2 partition this is 2 threads)
+cqd hbase_dop_parallel_scanner '1.0';
+explain select avg(a) from t140b;
+select avg(a) from t140b;
+-- reset to regular scanner
+cqd hbase_dop_parallel_scanner reset;
+cqd hbase_small_scanner reset;
+cqd parallel_num_esps reset;
+explain select avg(a) from t140b;
+select avg(a) from t140b;
+

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/sqlcomp/DefaultConstants.h
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/DefaultConstants.h 
b/core/sql/sqlcomp/DefaultConstants.h
index 5c0d13b..d284fc1 100644
--- a/core/sql/sqlcomp/DefaultConstants.h
+++ b/core/sql/sqlcomp/DefaultConstants.h
@@ -3808,6 +3808,10 @@ enum DefaultConstants
   // Currently syskey, _salt_, _division_.
   TRAF_ALLOW_RESERVED_COLNAMES,
 
+  //if 0, regular scanner is used. From 0.x to 1.0, percentage of regions that 
need to be scanned that will be done in parallel.
+  //if >= 2, set a fixed number of thread, real DOP. 2.0 2 thread, 3.0 3 
thread etc.
+  HBASE_DOP_PARALLEL_SCANNER,
+
   // This enum constant must be the LAST one in the list; it's a count,
   // not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)!
   __NUM_DEFAULT_ATTRIBUTES

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index 9d53aea..2b2c3ed 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -1757,7 +1757,7 @@ SDDkwd__(EXE_DIAGNOSTIC_EVENTS,           "OFF"),
  // We can remove this once the delete costing code has broader
  // exposure.
  DDkwd__(HBASE_DELETE_COSTING,                      "ON"),
-
+ DDflt0_(HBASE_DOP_PARALLEL_SCANNER,             "0."),
  DDkwd__(HBASE_FILTER_PREDS,                        "OFF"),
  DDkwd__(HBASE_HASH2_PARTITIONING,                   "ON"),
  DDui___(HBASE_INDEX_LEVEL,                          "0"),

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/a7f9a628/core/sql/src/main/java/org/trafodion/sql/HTableClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HTableClient.java 
b/core/sql/src/main/java/org/trafodion/sql/HTableClient.java
index 0760b90..4a9bc07 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HTableClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HTableClient.java
@@ -778,6 +778,7 @@ public class HTableClient {
                                  Object[] colNamesToFilter, 
                                  Object[] compareOpList, 
                                  Object[] colValuesToCompare,
+                                 float dopParallelScanner,
                                  float samplePercent,
                                  boolean inPreFetch,
                                  boolean useSnapshotScan,
@@ -979,9 +980,9 @@ public class HTableClient {
            if (useTRexScanner && (transID != 0)) {
              scanner = table.getScanner(transID, scan);
            } else {
-             scanner = table.getScanner(scan);
-           }
-           if (logger.isTraceEnabled()) logger.trace("startScan(). After 
getScanner. Scanner: " + scanner);
+          scanner = table.getScanner(scan,dopParallelScanner);
+        }
+        if (logger.isTraceEnabled()) logger.trace("startScan(). After 
getScanner. Scanner: " + scanner+" dop:"+dopParallelScanner);
          }
          else
          {

Reply via email to