This is an automated email from the ASF dual-hosted git repository.
mwalch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new f2d0371 Added additional client properties for Scanner and
BatchScanner (#548)
f2d0371 is described below
commit f2d037138859681daf9072807c270f2ed96638a4
Author: Mike Walch <[email protected]>
AuthorDate: Fri Jul 6 12:16:18 2018 -0400
Added additional client properties for Scanner and BatchScanner (#548)
---
.../org/apache/accumulo/core/client/Connector.java | 32 +++++++++++++++++++++-
.../accumulo/core/client/impl/ConnectorImpl.java | 30 +++++++++++++++++++-
.../accumulo/core/conf/ClientConfigGenerate.java | 2 ++
.../apache/accumulo/core/conf/ClientProperty.java | 8 ++++++
4 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
index 2a7ee36..c85da31 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
@@ -57,6 +57,27 @@ public abstract class Connector {
int numQueryThreads) throws TableNotFoundException;
/**
+ * Factory method to create a BatchScanner connected to Accumulo. This
method uses the number of
+ * query threads configured when Connector was created. If none were
configured, defaults will be
+ * used.
+ *
+ * @param tableName
+ * the name of the table to query
+ * @param authorizations
+ * A set of authorization labels that will be checked against the
column visibility of
+ * each key in order to filter data. The authorizations passed in
must be a subset of the
+ * accumulo user's set of authorizations. If the accumulo user has
authorizations (A1,
+ * A2) and authorizations (A2, A3) are passed, then an exception
will be thrown.
+ *
+ * @return BatchScanner object for configuring and querying
+ * @throws TableNotFoundException
+ * when the specified table doesn't exist
+ * @since 2.0.0
+ */
+ public abstract BatchScanner createBatchScanner(String tableName,
Authorizations authorizations)
+ throws TableNotFoundException;
+
+ /**
* Factory method to create a BatchDeleter connected to Accumulo.
*
* @param tableName
@@ -165,7 +186,6 @@ public abstract class Connector {
* @return BatchWriter object for configuring and writing data to
* @since 1.5.0
*/
-
public abstract BatchWriter createBatchWriter(String tableName,
BatchWriterConfig config)
throws TableNotFoundException;
@@ -585,6 +605,16 @@ public abstract class Connector {
* @return this builder
*/
ConnectionOptions withBatchWriterConfig(BatchWriterConfig
batchWriterConfig);
+
+ /**
+ * Build with default number of query threads for BatchScanner
+ */
+ ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads);
+
+ /**
+ * Build with default batch size for Scanner
+ */
+ ConnectionOptions withScannerBatchSize(int batchSize);
}
public interface FromOptions extends ConnectionOptions, PropertyOptions,
AuthenticationArgs {
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
index 244f764..eac7532 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
@@ -21,6 +21,7 @@ import static
com.google.common.base.Preconditions.checkArgument;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -108,6 +109,15 @@ public class ConnectorImpl extends Connector {
numQueryThreads);
}
+ @Override
+ public BatchScanner createBatchScanner(String tableName, Authorizations
authorizations)
+ throws TableNotFoundException {
+ Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS
+ .getInteger(context.getClientInfo().getProperties());
+ Objects.requireNonNull(numQueryThreads);
+ return createBatchScanner(tableName, authorizations, numQueryThreads);
+ }
+
@Deprecated
@Override
public BatchDeleter createBatchDeleter(String tableName, Authorizations
authorizations,
@@ -191,7 +201,13 @@ public class ConnectorImpl extends Connector {
throws TableNotFoundException {
checkArgument(tableName != null, "tableName is null");
checkArgument(authorizations != null, "authorizations is null");
- return new ScannerImpl(context, getTableId(tableName), authorizations);
+ Scanner scanner = new ScannerImpl(context, getTableId(tableName),
authorizations);
+ Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE
+ .getInteger(context.getClientInfo().getProperties());
+ if (batchSize != null) {
+ scanner.setBatchSize(batchSize);
+ }
+ return scanner;
}
@Override
@@ -348,6 +364,18 @@ public class ConnectorImpl extends Connector {
}
@Override
+ public ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads)
{
+ setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS,
numQueryThreads);
+ return this;
+ }
+
+ @Override
+ public ConnectionOptions withScannerBatchSize(int batchSize) {
+ setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize);
+ return this;
+ }
+
+ @Override
public SaslOptions withPrimary(String kerberosServerPrimary) {
setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY,
kerberosServerPrimary);
return this;
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
index 7b95ba2..691c660 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientConfigGenerate.java
@@ -47,6 +47,8 @@ class ClientConfigGenerate {
generateSection("Instance", "instance.");
generateSection("Authentication", "auth.", "auth.type",
"auth.principal");
generateSection("Batch Writer", "batch.writer.");
+ generateSection("Batch Scanner", "batch.scanner.");
+ generateSection("Scanner", "scanner.");
generateSection("SSL", "ssl.");
generateSection("SASL", "sasl.");
generateSection("Tracing", "trace.");
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
index 1ec1609..0b47e7f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java
@@ -61,6 +61,14 @@ public enum ClientProperty {
"Change the" + " durability for the BatchWriter session. To use the
table's durability"
+ " setting. use \"default\" which is the table's durability
setting."),
+ // Scanner
+ SCANNER_BATCH_SIZE("scanner.batch.size", "1000",
+ "Number of key/value pairs that will be fetched at time from tablet
server"),
+
+ // BatchScanner
+ BATCH_SCANNER_NUM_QUERY_THREADS("batch.scanner.num.query.threads", "3",
+ "Number of concurrent query threads to spawn for querying"),
+
// Bulk load
BULK_LOAD_THREADS("bulk.threads", "8C",
"The number of threads used to inspect bulk load files to determine
where files go. "