This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 31a2de7f001ef3c5572fb17bf9f0db1c2802289f Author: Mike Miller <mmil...@apache.org> AuthorDate: Thu Sep 6 16:32:08 2018 -0400 Created AccumuloClient from Connector #636 --- .../accumulo/core/client/AccumuloClient.java | 532 +++++++++++++++++++++ .../core/client/impl/AccumuloClientImpl.java | 449 +++++++++++++++++ .../accumulo/test/functional/AccumuloClientIT.java | 95 ++++ 3 files changed, 1076 insertions(+) diff --git a/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java new file mode 100644 index 0000000..1fe865a --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/AccumuloClient.java @@ -0,0 +1,532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client; + +import java.util.Properties; + +import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.NamespaceOperations; +import org.apache.accumulo.core.client.admin.ReplicationOperations; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.impl.AccumuloClientImpl; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.security.Authorizations; + +/** + * Client connection to an Accumulo instance. Allows the user to request a scanner, deleter or + * writer for the instance as well as various objects that permit administrative operations. + * Enforces security on the client side with by requiring user credentials. + * + * Supports fluent API. Various options can be provided to {@link #builder()} and when finished a + * call to build() will return the AccumuloClient object. For example: + * + * {@code AccumuloClient.builder().forInstance(instanceName, zookeepers) + * .usingPassword(user, password).withZkTimeout(1234).build();} + * + * @since 2.0.0 + */ +public interface AccumuloClient { + + /** + * Factory method to create a BatchScanner connected to Accumulo. + * + * @param tableName + * the name of the table to query + * @param authorizations + * A set of authorization labels that will be checked against the column visibility of + * each key in order to filter data. The authorizations passed in must be a subset of the + * accumulo user's set of authorizations. If the accumulo user has authorizations (A1, + * A2) and authorizations (A2, A3) are passed, then an exception will be thrown. + * @param numQueryThreads + * the number of concurrent threads to spawn for querying + * + * @return BatchScanner object for configuring and querying + * @throws TableNotFoundException + * when the specified table doesn't exist + */ + public BatchScanner createBatchScanner(String tableName, Authorizations authorizations, + int numQueryThreads) throws TableNotFoundException; + + /** + * Factory method to create a BatchScanner connected to Accumulo. This method uses the number of + * query threads configured when AccumuloClient was created. If none were configured, defaults + * will be used. + * + * @param tableName + * the name of the table to query + * @param authorizations + * A set of authorization labels that will be checked against the column visibility of + * each key in order to filter data. The authorizations passed in must be a subset of the + * accumulo user's set of authorizations. If the accumulo user has authorizations (A1, + * A2) and authorizations (A2, A3) are passed, then an exception will be thrown. + * + * @return BatchScanner object for configuring and querying + * @throws TableNotFoundException + * when the specified table doesn't exist + */ + public BatchScanner createBatchScanner(String tableName, Authorizations authorizations) + throws TableNotFoundException; + + /** + * Factory method to create BatchDeleter + * + * @param tableName + * the name of the table to query and delete from + * @param authorizations + * A set of authorization labels that will be checked against the column visibility of + * each key in order to filter data. The authorizations passed in must be a subset of the + * accumulo user's set of authorizations. If the accumulo user has authorizations (A1, + * A2) and authorizations (A2, A3) are passed, then an exception will be thrown. + * @param numQueryThreads + * the number of concurrent threads to spawn for querying + * @param config + * configuration used to create batch writer. This config takes precedence. Any unset + * values will be merged with config set when the AccumuloClient was created. If no + * config was set during AccumuloClient creation, BatchWriterConfig defaults will be + * used. + * @return BatchDeleter object for configuring and deleting + */ + + public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, + int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException; + + /** + * Factory method to create BatchDeleter. This method uses BatchWriterConfig set when + * AccumuloClient was created. If none was set, BatchWriterConfig defaults will be used. + * + * @param tableName + * the name of the table to query and delete from + * @param authorizations + * A set of authorization labels that will be checked against the column visibility of + * each key in order to filter data. The authorizations passed in must be a subset of the + * accumulo user's set of authorizations. If the accumulo user has authorizations (A1, + * A2) and authorizations (A2, A3) are passed, then an exception will be thrown. + * @param numQueryThreads + * the number of concurrent threads to spawn for querying + * @return BatchDeleter object + * @throws TableNotFoundException + * if table not found + */ + public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, + int numQueryThreads) throws TableNotFoundException; + + /** + * Factory method to create a BatchWriter connected to Accumulo. + * + * @param tableName + * the name of the table to insert data into + * @param config + * configuration used to create batch writer. This config will take precedence. Any unset + * values will merged with config set when the AccumuloClient was created. If no config + * was set during AccumuloClient creation, BatchWriterConfig defaults will be used. + * @return BatchWriter object for configuring and writing data to + */ + public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) + throws TableNotFoundException; + + /** + * Factory method to create a BatchWriter. This method uses BatchWriterConfig set when + * AccumuloClient was created. If none was set, BatchWriterConfig defaults will be used. + * + * @param tableName + * the name of the table to insert data into + * @return BatchWriter object + * @throws TableNotFoundException + * if table not found + */ + public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException; + + /** + * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch + * writers can queue data for multiple tables. Also data for multiple tables can be sent to a + * server in a single batch. Its an efficient way to ingest data into multiple tables from a + * single process. + * + * @param config + * configuration used to create multi-table batch writer. This config will take + * precedence. Any unset values will merged with config set when the AccumuloClient was + * created. If no config was set during AccumuloClient creation, BatchWriterConfig + * defaults will be used. + * @return MultiTableBatchWriter object for configuring and writing data to + */ + public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config); + + /** + * Factory method to create a Multi-Table BatchWriter. This method uses BatchWriterConfig set when + * AccumuloClient was created. If none was set, BatchWriterConfig defaults will be used. + * + * @return MultiTableBatchWriter object + */ + public MultiTableBatchWriter createMultiTableBatchWriter(); + + /** + * Factory method to create a Scanner connected to Accumulo. + * + * @param tableName + * the name of the table to query data from + * @param authorizations + * A set of authorization labels that will be checked against the column visibility of + * each key in order to filter data. The authorizations passed in must be a subset of the + * accumulo user's set of authorizations. If the accumulo user has authorizations (A1, + * A2) and authorizations (A2, A3) are passed, then an exception will be thrown. + * + * @return Scanner object for configuring and querying data with + * @throws TableNotFoundException + * when the specified table doesn't exist + */ + public Scanner createScanner(String tableName, Authorizations authorizations) + throws TableNotFoundException; + + /** + * Factory method to create a ConditionalWriter connected to Accumulo. + * + * @param tableName + * the name of the table to query data from + * @param config + * configuration used to create conditional writer + * + * @return ConditionalWriter object for writing ConditionalMutations + * @throws TableNotFoundException + * when the specified table doesn't exist + */ + public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) + throws TableNotFoundException; + + /** + * Get the current user for this AccumuloClient + * + * @return the user name + */ + public String whoami(); + + /** + * Returns a unique string that identifies this instance of accumulo. + * + * @return a UUID + */ + public String getInstanceID(); + + /** + * Retrieves a TableOperations object to perform table functions, such as create and delete. + * + * @return an object to manipulate tables + */ + public abstract TableOperations tableOperations(); + + /** + * Retrieves a NamespaceOperations object to perform namespace functions, such as create and + * delete. + * + * @return an object to manipulate namespaces + */ + public NamespaceOperations namespaceOperations(); + + /** + * Retrieves a SecurityOperations object to perform user security operations, such as creating + * users. + * + * @return an object to modify users and permissions + */ + public SecurityOperations securityOperations(); + + /** + * Retrieves an InstanceOperations object to modify instance configuration. + * + * @return an object to modify instance configuration + */ + public InstanceOperations instanceOperations(); + + /** + * Retrieves a ReplicationOperations object to manage replication configuration. + * + * @return an object to modify replication configuration + */ + public ReplicationOperations replicationOperations(); + + /** + * @return {@link ClientInfo} which contains information about client connection to Accumulo + */ + public abstract ClientInfo info(); + + /** + * Change user + * + * @param principal + * Principal/username + * @param token + * Authentication token + * @return {@link AccumuloClient} for new user + */ + public abstract AccumuloClient changeUser(String principal, AuthenticationToken token) + throws AccumuloSecurityException, AccumuloException; + + /** + * Builds ClientInfo after all options have been specified + */ + public interface ClientInfoFactory { + + /** + * Builds ClientInfo after all options have been specified + * + * @return ClientInfo + */ + ClientInfo info(); + } + + /** + * Builds AccumuloClient + */ + public interface AccumuloClientFactory extends ClientInfoFactory { + + /** + * Builds AccumuloClient after all options have been specified + * + * @return AccumuloClient + */ + AccumuloClient build() throws AccumuloException, AccumuloSecurityException; + + } + + /** + * Builder method for setting Accumulo instance and zookeepers + */ + public interface InstanceArgs { + AuthenticationArgs forInstance(String instanceName, String zookeepers); + } + + /** + * Builder methods for creating AccumuloClient using properties + */ + public interface PropertyOptions extends InstanceArgs { + + /** + * Build using properties file. An example properties file can be found at + * conf/accumulo-client.properties in the Accumulo tarball distribution. + * + * @param propertiesFile + * Path to properties file + * @return this builder + */ + AccumuloClientFactory usingProperties(String propertiesFile); + + /** + * Build using Java properties object. A list of available properties can be found in the + * documentation on the project website (http://accumulo.apache.org) under 'Development' -> + * 'Client Properties' + * + * @param properties + * Properties object + * @return this builder + */ + AccumuloClientFactory usingProperties(Properties properties); + } + + public interface ClientInfoOptions extends PropertyOptions { + + /** + * Build using Accumulo client information + * + * @param clientInfo + * ClientInfo object + * @return this builder + */ + FromOptions usingClientInfo(ClientInfo clientInfo); + } + + /** + * Build methods for authentication + */ + public interface AuthenticationArgs { + + /** + * Build using password-based credentials + * + * @param username + * User name + * @param password + * Password + * @return this builder + */ + ConnectionOptions usingPassword(String username, CharSequence password); + + /** + * Build using Kerberos credentials + * + * @param principal + * Principal + * @param keyTabFile + * Path to keytab file + * @return this builder + */ + ConnectionOptions usingKerberos(String principal, String keyTabFile); + + /** + * Build using specified credentials + * + * @param principal + * Principal/username + * @param token + * Authentication token + * @return this builder + */ + ConnectionOptions usingToken(String principal, AuthenticationToken token); + } + + /** + * Build methods for SSL/TLS + */ + public interface SslOptions extends AccumuloClientFactory { + + /** + * Build with SSL trust store + * + * @param path + * Path to trust store + * @return this builder + */ + SslOptions withTruststore(String path); + + /** + * Build with SSL trust store + * + * @param path + * Path to trust store + * @param password + * Password used to encrypt trust store + * @param type + * Trust store type + * @return this builder + */ + SslOptions withTruststore(String path, String password, String type); + + /** + * Build with SSL key store + * + * @param path + * Path to SSL key store + * @return this builder + */ + SslOptions withKeystore(String path); + + /** + * Build with SSL key store + * + * @param path + * Path to keystore + * @param password + * Password used to encrypt key store + * @param type + * Key store type + * @return this builder + */ + SslOptions withKeystore(String path, String password, String type); + + /** + * Use JSSE system properties to configure SSL + * + * @return this builder + */ + SslOptions useJsse(); + } + + /** + * Build methods for SASL + */ + public interface SaslOptions extends AccumuloClientFactory { + + /** + * Build with Kerberos Server Primary + * + * @param kerberosServerPrimary + * Kerberos server primary + * @return this builder + */ + SaslOptions withPrimary(String kerberosServerPrimary); + + /** + * Build with SASL quality of protection + * + * @param qualityOfProtection + * Quality of protection + * @return this builder + */ + SaslOptions withQop(String qualityOfProtection); + } + + /** + * Build methods for connection options + */ + public interface ConnectionOptions extends AccumuloClientFactory { + + /** + * Build using Zookeeper timeout + * + * @param timeout + * Zookeeper timeout (in milliseconds) + * @return this builder + */ + ConnectionOptions withZkTimeout(int timeout); + + /** + * Build with SSL/TLS options + * + * @return this builder + */ + SslOptions withSsl(); + + /** + * Build with SASL options + * + * @return this builder + */ + SaslOptions withSasl(); + + /** + * Build with BatchWriterConfig defaults for BatchWriter, MultiTableBatchWriter & + * BatchDeleter + * + * @param batchWriterConfig + * BatchWriterConfig + * @return this builder + */ + ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig); + + /** + * Build with default number of query threads for BatchScanner + */ + ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads); + + /** + * Build with default batch size for Scanner + */ + ConnectionOptions withScannerBatchSize(int batchSize); + } + + public interface FromOptions extends ConnectionOptions, PropertyOptions, AuthenticationArgs { + + } + + /** + * Creates builder for AccumuloClient. + * + * @return this builder + */ + public static ClientInfoOptions builder() { + return new AccumuloClientImpl.AccumuloClientBuilderImpl(); + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloClientImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloClientImpl.java new file mode 100644 index 0000000..82ced55 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/AccumuloClientImpl.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchDeleter; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.ClientInfo; +import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MultiTableBatchWriter; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.admin.InstanceOperations; +import org.apache.accumulo.core.client.admin.NamespaceOperations; +import org.apache.accumulo.core.client.admin.ReplicationOperations; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; +import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.core.master.state.tables.TableState; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.trace.Tracer; + +public class AccumuloClientImpl extends Connector implements AccumuloClient { + private static final String SYSTEM_TOKEN_NAME = "org.apache.accumulo.server.security." + + "SystemCredentials$SystemToken"; + private final ClientContext context; + private final String instanceID; + private SecurityOperations secops = null; + private TableOperationsImpl tableops = null; + private NamespaceOperations namespaceops = null; + private InstanceOperations instanceops = null; + private ReplicationOperations replicationops = null; + + public AccumuloClientImpl(final ClientContext context) + throws AccumuloSecurityException, AccumuloException { + checkArgument(context != null, "Context is null"); + checkArgument(context.getCredentials() != null, "Credentials are null"); + checkArgument(context.getCredentials().getToken() != null, "Authentication token is null"); + if (context.getCredentials().getToken().isDestroyed()) + throw new AccumuloSecurityException(context.getCredentials().getPrincipal(), + SecurityErrorCode.TOKEN_EXPIRED); + + this.context = context; + instanceID = context.getInstanceID(); + + // Skip fail fast for system services; string literal for class name, to avoid dependency on + // server jar + final String tokenClassName = context.getCredentials().getToken().getClass().getName(); + if (!SYSTEM_TOKEN_NAME.equals(tokenClassName)) { + ServerClient.executeVoid(context, iface -> { + if (!iface.authenticate(Tracer.traceInfo(), context.rpcCreds())) + throw new AccumuloSecurityException("Authentication failed, access denied", + SecurityErrorCode.BAD_CREDENTIALS); + }); + } + + this.tableops = new TableOperationsImpl(context); + this.namespaceops = new NamespaceOperationsImpl(context, tableops); + } + + private Table.ID getTableId(String tableName) throws TableNotFoundException { + Table.ID tableId = Tables.getTableId(context, tableName); + if (Tables.getTableState(context, tableId) == TableState.OFFLINE) + throw new TableOfflineException(Tables.getTableOfflineMsg(context, tableId)); + return tableId; + } + + @Override + @Deprecated + public org.apache.accumulo.core.client.Instance getInstance() { + return context.getDeprecatedInstance(); + } + + @Override + public BatchScanner createBatchScanner(String tableName, Authorizations authorizations, + int numQueryThreads) throws TableNotFoundException { + checkArgument(tableName != null, "tableName is null"); + checkArgument(authorizations != null, "authorizations is null"); + return new TabletServerBatchReader(context, getTableId(tableName), authorizations, + numQueryThreads); + } + + @Override + public BatchScanner createBatchScanner(String tableName, Authorizations authorizations) + throws TableNotFoundException { + Integer numQueryThreads = ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS + .getInteger(context.getClientInfo().getProperties()); + Objects.requireNonNull(numQueryThreads); + return createBatchScanner(tableName, authorizations, numQueryThreads); + } + + @Deprecated + @Override + public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, + int numQueryThreads, long maxMemory, long maxLatency, int maxWriteThreads) + throws TableNotFoundException { + checkArgument(tableName != null, "tableName is null"); + checkArgument(authorizations != null, "authorizations is null"); + return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, + numQueryThreads, new BatchWriterConfig().setMaxMemory(maxMemory) + .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); + } + + @Override + public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, + int numQueryThreads, BatchWriterConfig config) throws TableNotFoundException { + checkArgument(tableName != null, "tableName is null"); + checkArgument(authorizations != null, "authorizations is null"); + return new TabletServerBatchDeleter(context, getTableId(tableName), authorizations, + numQueryThreads, config.merge(context.getBatchWriterConfig())); + } + + @Override + public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, + int numQueryThreads) throws TableNotFoundException { + return createBatchDeleter(tableName, authorizations, numQueryThreads, new BatchWriterConfig()); + } + + @Deprecated + @Override + public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, + int maxWriteThreads) throws TableNotFoundException { + checkArgument(tableName != null, "tableName is null"); + return new BatchWriterImpl(context, getTableId(tableName), + new BatchWriterConfig().setMaxMemory(maxMemory) + .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); + } + + @Override + public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) + throws TableNotFoundException { + checkArgument(tableName != null, "tableName is null"); + // we used to allow null inputs for bw config + if (config == null) { + config = new BatchWriterConfig(); + } + return new BatchWriterImpl(context, getTableId(tableName), + config.merge(context.getBatchWriterConfig())); + } + + @Override + public BatchWriter createBatchWriter(String tableName) throws TableNotFoundException { + return createBatchWriter(tableName, new BatchWriterConfig()); + } + + @Deprecated + @Override + public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, + int maxWriteThreads) { + return new MultiTableBatchWriterImpl(context, new BatchWriterConfig().setMaxMemory(maxMemory) + .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads)); + } + + @Override + public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) { + return new MultiTableBatchWriterImpl(context, config.merge(context.getBatchWriterConfig())); + } + + @Override + public MultiTableBatchWriter createMultiTableBatchWriter() { + return createMultiTableBatchWriter(new BatchWriterConfig()); + } + + @Override + public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) + throws TableNotFoundException { + return new ConditionalWriterImpl(context, getTableId(tableName), config); + } + + @Override + public Scanner createScanner(String tableName, Authorizations authorizations) + throws TableNotFoundException { + checkArgument(tableName != null, "tableName is null"); + checkArgument(authorizations != null, "authorizations is null"); + Scanner scanner = new ScannerImpl(context, getTableId(tableName), authorizations); + Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE + .getInteger(context.getClientInfo().getProperties()); + if (batchSize != null) { + scanner.setBatchSize(batchSize); + } + return scanner; + } + + @Override + public String whoami() { + return context.getCredentials().getPrincipal(); + } + + @Override + public String getInstanceID() { + return instanceID; + } + + @Override + public synchronized TableOperations tableOperations() { + return tableops; + } + + @Override + public synchronized NamespaceOperations namespaceOperations() { + return namespaceops; + } + + @Override + public synchronized SecurityOperations securityOperations() { + if (secops == null) + secops = new SecurityOperationsImpl(context); + + return secops; + } + + @Override + public synchronized InstanceOperations instanceOperations() { + if (instanceops == null) + instanceops = new InstanceOperationsImpl(context); + + return instanceops; + } + + @Override + public synchronized ReplicationOperations replicationOperations() { + if (null == replicationops) { + replicationops = new ReplicationOperationsImpl(context); + } + + return replicationops; + } + + @Override + public ClientInfo info() { + return this.context.getClientInfo(); + } + + @Override + public AccumuloClient changeUser(String principal, AuthenticationToken token) + throws AccumuloSecurityException, AccumuloException { + return AccumuloClient.builder().usingClientInfo(info()).usingToken(principal, token).build(); + } + + public static class AccumuloClientBuilderImpl + implements InstanceArgs, PropertyOptions, ClientInfoOptions, AuthenticationArgs, + ConnectionOptions, SslOptions, SaslOptions, AccumuloClientFactory, FromOptions { + + private Properties properties = new Properties(); + private AuthenticationToken token = null; + + private ClientInfo getClientInfo() { + if (token != null) { + return new ClientInfoImpl(properties, token); + } + return new ClientInfoImpl(properties); + } + + @Override + public AccumuloClient build() throws AccumuloException, AccumuloSecurityException { + return org.apache.accumulo.core.client.impl.ClientInfoFactory.getConnector(getClientInfo()); + } + + @Override + public ClientInfo info() { + return getClientInfo(); + } + + @Override + public AuthenticationArgs forInstance(String instanceName, String zookeepers) { + setProperty(ClientProperty.INSTANCE_NAME, instanceName); + setProperty(ClientProperty.INSTANCE_ZOOKEEPERS, zookeepers); + return this; + } + + @Override + public SslOptions withTruststore(String path) { + setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path); + return this; + } + + @Override + public SslOptions withTruststore(String path, String password, String type) { + setProperty(ClientProperty.SSL_TRUSTSTORE_PATH, path); + setProperty(ClientProperty.SSL_TRUSTSTORE_PASSWORD, password); + setProperty(ClientProperty.SSL_TRUSTSTORE_TYPE, type); + return this; + } + + @Override + public SslOptions withKeystore(String path) { + setProperty(ClientProperty.SSL_KEYSTORE_PATH, path); + return this; + } + + @Override + public SslOptions withKeystore(String path, String password, String type) { + setProperty(ClientProperty.SSL_KEYSTORE_PATH, path); + setProperty(ClientProperty.SSL_KEYSTORE_PASSWORD, password); + setProperty(ClientProperty.SSL_KEYSTORE_TYPE, type); + return this; + } + + @Override + public SslOptions useJsse() { + setProperty(ClientProperty.SSL_USE_JSSE, "true"); + return this; + } + + @Override + public ConnectionOptions withZkTimeout(int timeout) { + setProperty(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT, Integer.toString(timeout) + "ms"); + return this; + } + + @Override + public SslOptions withSsl() { + setProperty(ClientProperty.SSL_ENABLED, "true"); + return this; + } + + @Override + public SaslOptions withSasl() { + setProperty(ClientProperty.SASL_ENABLED, "true"); + return this; + } + + @Override + public ConnectionOptions withBatchWriterConfig(BatchWriterConfig batchWriterConfig) { + setProperty(ClientProperty.BATCH_WRITER_MAX_MEMORY_BYTES, batchWriterConfig.getMaxMemory()); + setProperty(ClientProperty.BATCH_WRITER_MAX_LATENCY_SEC, + batchWriterConfig.getMaxLatency(TimeUnit.SECONDS)); + setProperty(ClientProperty.BATCH_WRITER_MAX_TIMEOUT_SEC, + batchWriterConfig.getTimeout(TimeUnit.SECONDS)); + setProperty(ClientProperty.BATCH_WRITER_MAX_WRITE_THREADS, + batchWriterConfig.getMaxWriteThreads()); + setProperty(ClientProperty.BATCH_WRITER_DURABILITY, + batchWriterConfig.getDurability().toString()); + return this; + } + + @Override + public ConnectionOptions withBatchScannerQueryThreads(int numQueryThreads) { + setProperty(ClientProperty.BATCH_SCANNER_NUM_QUERY_THREADS, numQueryThreads); + return this; + } + + @Override + public ConnectionOptions withScannerBatchSize(int batchSize) { + setProperty(ClientProperty.SCANNER_BATCH_SIZE, batchSize); + return this; + } + + @Override + public SaslOptions withPrimary(String kerberosServerPrimary) { + setProperty(ClientProperty.SASL_KERBEROS_SERVER_PRIMARY, kerberosServerPrimary); + return this; + } + + @Override + public SaslOptions withQop(String qualityOfProtection) { + setProperty(ClientProperty.SASL_QOP, qualityOfProtection); + return this; + } + + @Override + public AccumuloClientFactory usingProperties(String configFile) { + Properties properties = new Properties(); + try (InputStream is = new FileInputStream(configFile)) { + properties.load(is); + } catch (IOException e) { + throw new IllegalArgumentException(e); + } + return usingProperties(properties); + } + + @Override + public AccumuloClientFactory usingProperties(Properties properties) { + this.properties = properties; + return this; + } + + @Override + public ConnectionOptions usingPassword(String principal, CharSequence password) { + setProperty(ClientProperty.AUTH_PRINCIPAL, principal); + ClientProperty.setPassword(properties, password.toString()); + return this; + } + + @Override + public ConnectionOptions usingKerberos(String principal, String keyTabFile) { + setProperty(ClientProperty.AUTH_PRINCIPAL, principal); + ClientProperty.setKerberosKeytab(properties, keyTabFile); + return this; + } + + @Override + public ConnectionOptions usingToken(String principal, AuthenticationToken token) { + setProperty(ClientProperty.AUTH_PRINCIPAL, principal); + this.token = token; + return this; + } + + @Override + public FromOptions usingClientInfo(ClientInfo clientInfo) { + this.properties = clientInfo.getProperties(); + return this; + } + + public void setProperty(ClientProperty property, String value) { + properties.setProperty(property.getKey(), value); + } + + public void setProperty(ClientProperty property, Long value) { + setProperty(property, Long.toString(value)); + } + + public void setProperty(ClientProperty property, Integer value) { + setProperty(property, Integer.toString(value)); + } + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java new file mode 100644 index 0000000..cbaeaf7 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.functional; + +import java.util.Properties; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.ClientInfo; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.harness.AccumuloClusterHarness; +import org.junit.Assert; +import org.junit.Test; + +public class AccumuloClientIT extends AccumuloClusterHarness { + + @Test + public void testConnectorBuilder() throws Exception { + AccumuloClient c = getAccumuloClient(); + String instanceName = c.info().getInstanceName(); + String zookeepers = c.info().getZooKeepers(); + final String user = "testuser"; + final String password = "testpassword"; + c.securityOperations().createLocalUser(user, new PasswordToken(password)); + + AccumuloClient conn = AccumuloClient.builder().forInstance(instanceName, zookeepers) + .usingPassword(user, password).withZkTimeout(1234).build(); + + Assert.assertEquals(instanceName, conn.info().getInstanceName()); + Assert.assertEquals(zookeepers, conn.info().getZooKeepers()); + Assert.assertEquals(user, conn.whoami()); + Assert.assertEquals(1234, conn.info().getZooKeepersSessionTimeOut()); + + ClientInfo info = AccumuloClient.builder().forInstance(instanceName, zookeepers) + .usingPassword(user, password).info(); + Assert.assertEquals(instanceName, info.getInstanceName()); + Assert.assertEquals(zookeepers, info.getZooKeepers()); + Assert.assertEquals(user, info.getPrincipal()); + Assert.assertTrue(info.getAuthenticationToken() instanceof PasswordToken); + + Properties props = new Properties(); + props.put(ClientProperty.INSTANCE_NAME.getKey(), instanceName); + props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), zookeepers); + props.put(ClientProperty.AUTH_PRINCIPAL.getKey(), user); + props.put(ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT.getKey(), "22s"); + ClientProperty.setPassword(props, password); + conn = AccumuloClient.builder().usingProperties(props).build(); + + Assert.assertEquals(instanceName, conn.info().getInstanceName()); + Assert.assertEquals(zookeepers, conn.info().getZooKeepers()); + Assert.assertEquals(user, conn.whoami()); + Assert.assertEquals(22000, conn.info().getZooKeepersSessionTimeOut()); + + final String user2 = "testuser2"; + final String password2 = "testpassword2"; + c.securityOperations().createLocalUser(user2, new PasswordToken(password2)); + + AccumuloClient conn2 = AccumuloClient.builder().usingClientInfo(conn.info()) + .usingToken(user2, new PasswordToken(password2)).build(); + Assert.assertEquals(instanceName, conn2.info().getInstanceName()); + Assert.assertEquals(zookeepers, conn2.info().getZooKeepers()); + Assert.assertEquals(user2, conn2.whoami()); + info = conn2.info(); + Assert.assertEquals(instanceName, info.getInstanceName()); + Assert.assertEquals(zookeepers, info.getZooKeepers()); + Assert.assertEquals(user2, info.getPrincipal()); + + final String user3 = "testuser3"; + final String password3 = "testpassword3"; + c.securityOperations().createLocalUser(user3, new PasswordToken(password3)); + + AccumuloClient conn3 = conn.changeUser(user3, new PasswordToken(password3)); + Assert.assertEquals(instanceName, conn3.info().getInstanceName()); + Assert.assertEquals(zookeepers, conn3.info().getZooKeepers()); + Assert.assertEquals(user3, conn3.whoami()); + info = conn3.info(); + Assert.assertEquals(instanceName, info.getInstanceName()); + Assert.assertEquals(zookeepers, info.getZooKeepers()); + Assert.assertEquals(user3, info.getPrincipal()); + } +}