On Thu, Nov 7, 2013 at 8:39 PM, <[email protected]> wrote:
> Updated Branches:
> refs/heads/ACCUMULO-1833-caching [created] 3b6eade61
>
>
> ACCUMULO-1833 Rework the getBatchWriter method on MTBW to remove
> zookeeper lock contention and get better concurrent throughput.
>
>
> Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
> Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cba87980
> Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cba87980
> Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cba87980
>
> Branch: refs/heads/ACCUMULO-1833-caching
> Commit: cba87980cbd731338c58f05734ebb3d3e683b440
> Parents: 060188a
> Author: Josh Elser <[email protected]>
> Authored: Thu Nov 7 16:49:41 2013 -0500
> Committer: Josh Elser <[email protected]>
> Committed: Thu Nov 7 16:49:41 2013 -0500
>
> ----------------------------------------------------------------------
> core/pom.xml | 4 +
> .../apache/accumulo/core/client/Connector.java | 44 ++++++-
> .../core/client/impl/ConnectorImpl.java | 12 ++
> .../client/impl/MultiTableBatchWriterImpl.java | 116 ++++++++++++++-----
> .../core/client/mock/MockConnector.java | 11 ++
> 5 files changed, 159 insertions(+), 28 deletions(-)
> ----------------------------------------------------------------------
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/pom.xml
> ----------------------------------------------------------------------
> diff --git a/core/pom.xml b/core/pom.xml
> index f7539f5..d02a3cd 100644
> --- a/core/pom.xml
> +++ b/core/pom.xml
> @@ -30,6 +30,10 @@
> <artifactId>jcommander</artifactId>
> </dependency>
> <dependency>
> + <groupId>com.google.guava</groupId>
> + <artifactId>guava</artifactId>
> + </dependency>
> + <dependency>
> <groupId>jline</groupId>
> <artifactId>jline</artifactId>
> </dependency>
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> ----------------------------------------------------------------------
> diff --git
> a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> index d2e7321..68dc881 100644
> --- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> +++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java
> @@ -16,6 +16,8 @@
> */
> package org.apache.accumulo.core.client;
>
> +import java.util.concurrent.TimeUnit;
> +
> import org.apache.accumulo.core.client.admin.InstanceOperations;
> import org.apache.accumulo.core.client.admin.SecurityOperations;
> import org.apache.accumulo.core.client.admin.TableOperations;
> @@ -146,8 +148,32 @@ public abstract class Connector {
> public abstract MultiTableBatchWriter createMultiTableBatchWriter(long
> maxMemory, long maxLatency, int maxWriteThreads);
>
> /**
> + * Factory method to create a Multi-Table BatchWriter connected to
> Accumulo. Multi-table batch writers can queue data for multiple tables,
> which is good for
> + * ingesting data into multiple tables from the same source. Caching of
> ZooKeeper table information defaults to {@link
> MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME}
> + * and {@link MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME_UNIT}
> + *
> + * @param maxMemory
> + * size in bytes of the maximum memory to batch before writing
> + * @param maxLatency
> + * size in milliseconds; set to 0 or Long.MAX_VALUE to allow
> the maximum time to hold a batch before writing
> + * @param maxWriteThreads
> + * the maximum number of threads to use for writing data to
> the tablet servers
> + * @param cacheTime
> + * Duration of time to cache ZooKeeper table information
> + * @param cacheTimeUnit
> + * Unit of time to apply to {@link cacheTime}
> + *
> + * @return MultiTableBatchWriter object for configuring and writing
> data to
> + * @deprecated since 1.5.0; Use {@link
> #createMultiTableBatchWriter(BatchWriterConfig)} instead.
> + * @since 1.5.1
> + */
> + @Deprecated
> + public abstract MultiTableBatchWriter createMultiTableBatchWriter(long
> maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit
> cacheTimeUnit);
>
I don't think this change needs to impact that API. I suspect just adding
a cache w/ a really short timeout (like 50ms to 100ms) will give the
performance benefit we are looking for. Also making API changes in a bug
fix release means its possible to write something for 1.5.1 that will not
work w/ 1.5.0
> +
> + /**
> * Factory method to create a Multi-Table BatchWriter connected to
> Accumulo. Multi-table batch writers can queue data for multiple tables.
> Also data for
> - * multiple tables can be sent to a server in a single batch. Its an
> efficient way to ingest data into multiple tables from a single process.
> + * multiple tables can be sent to a server in a single batch. Its an
> efficient way to ingest data into multiple tables from a single process.
> Caching
> + * of ZooKeeper table information defaults to {@link
> MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME} and {@link
> MultiTableBatchWriterImpl#DEFAULT_CACHE_TIME_UNIT}
> *
> * @param config
> * configuration used to create multi-table batch writer
> @@ -158,6 +184,22 @@ public abstract class Connector {
> public abstract MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config);
>
> /**
> + * Factory method to create a Multi-Table BatchWriter connected to
> Accumulo. Multi-table batch writers can queue data for multiple tables.
> Also data for
> + * multiple tables can be sent to a server in a single batch. Its an
> efficient way to ingest data into multiple tables from a single process.
> This method
> + * also allows the user to provide parameters as to how long table
> information from ZooKeeper is cached.
> + * @param config
> + * configuration used to create the multi-table batch writer
> + * @param cacheTime
> + * Duration of time to cache ZooKeeper table information
> + * @param cacheTimeUnit
> + * Unit of time to apply to {@link cacheTime}
> + * @return
> + * @since 1.5.1
> + */
> + public abstract MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config, long cacheTime,
> TimeUnit cacheTimeUnit);
> +
> +
> + /**
> * Factory method to create a Scanner connected to Accumulo.
> *
> * @param tableName
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> ----------------------------------------------------------------------
> diff --git
> a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> index 1702082..89d2813 100644
> ---
> a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> +++
> b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
> @@ -126,12 +126,24 @@ public class ConnectorImpl extends Connector {
> .setMaxLatency(maxLatency,
> TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
> }
>
> + @Deprecated
> + @Override
> + public MultiTableBatchWriter createMultiTableBatchWriter(long
> maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit
> cacheTimeUnit) {
> + return new MultiTableBatchWriterImpl(instance, credentials, new
> BatchWriterConfig().setMaxMemory(maxMemory)
> + .setMaxLatency(maxLatency,
> TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads), cacheTime,
> cacheTimeUnit);
> + }
> +
> @Override
> public MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config) {
> return new MultiTableBatchWriterImpl(instance, credentials, config);
> }
>
> @Override
> + public MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config, long timeToCache,
> TimeUnit timeUnit) {
> + return new MultiTableBatchWriterImpl(instance, credentials, config,
> timeToCache, timeUnit);
> + }
> +
> + @Override
> public Scanner createScanner(String tableName, Authorizations
> authorizations) throws TableNotFoundException {
> ArgumentChecker.notNull(tableName, authorizations);
> return new ScannerImpl(instance, credentials, getTableId(tableName),
> authorizations);
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> ----------------------------------------------------------------------
> diff --git
> a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> index 4537ae8..06b6f75 100644
> ---
> a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> +++
> b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
> @@ -16,7 +16,9 @@
> */
> package org.apache.accumulo.core.client.impl;
>
> -import java.util.HashMap;
> +import java.util.concurrent.ConcurrentHashMap;
> +import java.util.concurrent.ExecutionException;
> +import java.util.concurrent.TimeUnit;
>
> import org.apache.accumulo.core.client.AccumuloException;
> import org.apache.accumulo.core.client.AccumuloSecurityException;
> @@ -33,62 +35,97 @@ import
> org.apache.accumulo.core.security.thrift.TCredentials;
> import org.apache.accumulo.core.util.ArgumentChecker;
> import org.apache.log4j.Logger;
>
> +import com.google.common.cache.CacheBuilder;
> +import com.google.common.cache.CacheLoader;
> +import com.google.common.cache.LoadingCache;
> +
> public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
> + public static final long DEFAULT_CACHE_TIME = 60;
> + public static final TimeUnit DEFAULT_CACHE_TIME_UNIT = TimeUnit.SECONDS;
> +
> static final Logger log =
> Logger.getLogger(MultiTableBatchWriterImpl.class);
> private boolean closed;
> -
> +
> private class TableBatchWriter implements BatchWriter {
> -
> +
> private String table;
> -
> +
> TableBatchWriter(String table) {
> this.table = table;
> }
> -
> +
> @Override
> public void addMutation(Mutation m) throws MutationsRejectedException
> {
> ArgumentChecker.notNull(m);
> bw.addMutation(table, m);
> }
> -
> +
> @Override
> public void addMutations(Iterable<Mutation> iterable) throws
> MutationsRejectedException {
> bw.addMutation(table, iterable.iterator());
> }
> -
> +
> @Override
> public void close() {
> throw new UnsupportedOperationException("Must close all tables, can
> not close an individual table");
> }
> -
> +
> @Override
> public void flush() {
> throw new UnsupportedOperationException("Must flush all tables, can
> not flush an individual table");
> }
> -
> +
> }
> -
> +
> + /**
> + * CacheLoader which will look up the internal table ID for a given
> table name.
> + */
> + private class TableNameToIdLoader extends CacheLoader<String,String> {
> +
> + @Override
> + public String load(String tableName) throws Exception {
> + String tableId = Tables.getNameToIdMap(instance).get(tableName);
> +
> + if (tableId == null)
> + throw new TableNotFoundException(tableId, tableName, null);
> +
> + if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
> + throw new TableOfflineException(instance, tableId);
> +
> + return tableId;
> + }
> +
> + }
> +
> private TabletServerBatchWriter bw;
> - private HashMap<String,BatchWriter> tableWriters;
> + private ConcurrentHashMap<String,BatchWriter> tableWriters;
> private Instance instance;
> -
> + private final LoadingCache<String,String> nameToIdCache;
> +
> public MultiTableBatchWriterImpl(Instance instance, TCredentials
> credentials, BatchWriterConfig config) {
> - ArgumentChecker.notNull(instance, credentials);
> + this(instance, credentials, config, DEFAULT_CACHE_TIME,
> DEFAULT_CACHE_TIME_UNIT);
> + }
> +
> + public MultiTableBatchWriterImpl(Instance instance, TCredentials
> credentials, BatchWriterConfig config, long cacheTime, TimeUnit
> cacheTimeUnit) {
> + ArgumentChecker.notNull(instance, credentials, config, cacheTimeUnit);
> this.instance = instance;
> this.bw = new TabletServerBatchWriter(instance, credentials, config);
> - tableWriters = new HashMap<String,BatchWriter>();
> + tableWriters = new ConcurrentHashMap<String,BatchWriter>();
> this.closed = false;
> +
> + nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime,
> cacheTimeUnit).concurrencyLevel(8).maximumSize(64).initialCapacity(16)
> + .build(new TableNameToIdLoader());
> }
> -
> +
> public boolean isClosed() {
> return this.closed;
> }
> -
> +
> public void close() throws MutationsRejectedException {
> bw.close();
> this.closed = true;
> }
> -
> +
> /**
> * Warning: do not rely upon finalize to close this class. Finalize is
> not guaranteed to be called.
> */
> @@ -105,16 +142,41 @@ public class MultiTableBatchWriterImpl implements
> MultiTableBatchWriter {
> }
> }
>
> + /**
> + * Returns the table ID for the given table name.
> + * @param tableName The name of the table which to find the ID for
> + * @return The table ID, or null if the table name doesn't exist
> + */
> + private String getId(String tableName) throws TableNotFoundException {
> + try {
> + return nameToIdCache.get(tableName);
> + } catch (ExecutionException e) {
> + Throwable cause = e.getCause();
> +
> + if (null == cause) {
> + throw new RuntimeException(e);
> + }
> +
> + if (cause instanceof TableNotFoundException) {
> +
> + throw (TableNotFoundException) cause;
> + }
> +
> + if (cause instanceof TableOfflineException) {
> + throw (TableOfflineException) cause;
> + }
> +
> + log.error("Unexpected exception when fetching table id for " +
> tableName);
> +
> + throw new RuntimeException(e);
> + }
> + }
> +
> @Override
> - public synchronized BatchWriter getBatchWriter(String tableName) throws
> AccumuloException, AccumuloSecurityException, TableNotFoundException {
> + public BatchWriter getBatchWriter(String tableName) throws
> AccumuloException, AccumuloSecurityException, TableNotFoundException {
> ArgumentChecker.notNull(tableName);
> - String tableId = Tables.getNameToIdMap(instance).get(tableName);
> - if (tableId == null)
> - throw new TableNotFoundException(tableId, tableName, null);
> -
> - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
> - throw new TableOfflineException(instance, tableId);
> -
> + String tableId = getId(tableName);
> +
> BatchWriter tbw = tableWriters.get(tableId);
> if (tbw == null) {
> tbw = new TableBatchWriter(tableId);
> @@ -122,10 +184,10 @@ public class MultiTableBatchWriterImpl implements
> MultiTableBatchWriter {
> }
> return tbw;
> }
> -
> +
> @Override
> public void flush() throws MutationsRejectedException {
> bw.flush();
> }
> -
> +
> }
>
>
> http://git-wip-us.apache.org/repos/asf/accumulo/blob/cba87980/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> ----------------------------------------------------------------------
> diff --git
> a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> index 1179559..2aa6291 100644
> ---
> a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> +++
> b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
> @@ -90,12 +90,23 @@ public class MockConnector extends Connector {
> return new MockMultiTableBatchWriter(acu);
> }
>
> + @Deprecated
> + @Override
> + public MultiTableBatchWriter createMultiTableBatchWriter(long
> maxMemory, long maxLatency, int maxWriteThreads, long cacheTime, TimeUnit
> cacheTimeUnit) {
> + return new MockMultiTableBatchWriter(acu);
> + }
> +
> @Override
> public MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config) {
> return createMultiTableBatchWriter(config.getMaxMemory(),
> config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads());
> }
>
> @Override
> + public MultiTableBatchWriter
> createMultiTableBatchWriter(BatchWriterConfig config, long cacheTime,
> TimeUnit cacheTimeUnit) {
> + return createMultiTableBatchWriter(config.getMaxMemory(),
> config.getMaxLatency(TimeUnit.MILLISECONDS), config.getMaxWriteThreads(),
> cacheTime, cacheTimeUnit);
> + }
> +
> + @Override
> public Scanner createScanner(String tableName, Authorizations
> authorizations) throws TableNotFoundException {
> MockTable table = acu.tables.get(tableName);
> if (table == null)
>
>