Thanks, Keith. You're right, it looks like that while MultiTableBatchWriterImpl -> TabletServerBatchWriter -> Map<String,TabletLocator> locators, are distinct instances, the TabletLocator loaded into the locators map is static:
private static HashMap<LocatorKey,TabletLocator> locators = new HashMap<LocatorKey,TabletLocator>(); The behavior which we saw: All our threads were in a loop creating new MultiTableBatchWriter instances because of Timeouts. The Accumulo monitor was showing 20M/sec of queries scanning the metadata table. This made me think each instance had its own cache. Instead, either invalidate was called for some reason, or a good amount of jvms were never able to populate the cache the first time which I think is more likely. I'm glad there is one cache :) On Tue, Aug 30, 2016 at 3:01 PM, Keith Turner <[email protected]> wrote: > On Tue, Aug 30, 2016 at 2:20 PM, Michael Moss <[email protected]> > wrote: > > Appreciate your input, Keith. Some inlining below. > > > > On 2016-08-29 16:24 (-0400), Keith Turner <[email protected]> wrote: > >> On Fri, Aug 26, 2016 at 12:21 PM, Michael Moss <[email protected]> > wrote:> > >> > Hello, Folks.> > >> >> > >> > As I look at the following tickets, I thought it might be useful to > > share> > >> > how we are using the BatchWriter, some of the challenges we've had, > > some> > >> > thoughts about it's redesign and how we might get involved.> > >> >> > >> > https://issues.apache.org/jira/browse/ACCUMULO-4154> > >> > https://issues.apache.org/jira/browse/ACCUMULO-2589> > >> > https://issues.apache.org/jira/browse/ACCUMULO-2990> > >> >> > >> > One of our primary use cases of the BatchWriter is from within a > Storm> > >> > topology, reading from Kafka. Generally speaking, storm might be > > persisting> > >> > a a single or small set of mutations at a time (low latency), or in > > larger> > >> > batches with Trident (higher throughput). In addition to ACCUMULO-2990 > > (any> > >> > TimedOutException, which then throws MutationsRejectedException and> > >> > requires a new connection to be made), one of our requirements is to > > ensure> > >> > that any given thread's mutations are the ones which are flushed and > > none> > >> > others (pseudo transactions). Otherwise, we might get a failure for a> > >> > mutation which belongs to another thread (and already ACKed by Storm) > > which> > >> > means we don't have a 'handle' on that offset anymore in Kafka to > > replay> > >> > the failure - i.e. the message could be 'lost'.> > >> >> > >> > Despite being threadsafe, we end up using a single BatchWriter per > > thread> > >> > >> How many threads are you having each batch writer use?> > > > > We've tuned it to be anywhere between the default of 3, to 1-x * # of > cpus. > > The latest issue we ran into was under high write frequency, the queries > > requesting metadata to locate tablets were being queued up on the server > > and that was causing timeouts (in an endless loop - timeout, new > > batchwriter, metadata query, timeout...). Increasing batching (decreasing > > write frequency) helps with that, as well as increasing timeouts. I don't > > know this code well yet, but it looks like there is > > one TabletLocatorImpl.metaCache per TabletServerBatchWriter. I'm > wondering > > if maybe that metadata could instead be shared across all BatchWriters on > > the Connector instead? > > Interesting, Accumulo's metadata cache is static (this is ugly) so > batchwriters and connectors should still share this cache, unless > there are different classloaders. I would consider it a bug if this > were not happening. Can you provide more insight into this? How do > you know that tablet location are being unnecessarily reloaded (i.e it > should know the tablet location from a previous write by a different > batch writer)? > > > > >> > >> > to make reasoning about the above simpler, but this creates a > resource> > >> > issue - number of connections to accumulo and zk.> > >> >> > >> > This all makes me wonder what the design goals might have been for > the> > >> > current version of the driver and if the efforts to rewrite it might> > >> > benefit from incorporating elements to address some of these use > cases> > >> > above.> > >> >> > >> > What can we learn from how drivers for other "NoSQL" databases are> > >> > implemented? Would it make sense to remove all the global variables> > >> > ("somethingFailed"), thread sleep/notify, frequent calls to> > >> > "checkForFailures()" and consider using a 'connection pool' model > > where> > >> > writes are single-threaded, linearized and isolated during the > > connection> > >> > lease? Could we make the client non-blocking and with optional > > pipelining,> > >> > so multiple writes could share a connection and allow interleaving of> > >> > operations (with individual acks)?> > >> > >> I am hoping to work on async batch writer, conditional writter, and> > >> scanners at some point. I think this may address some of the things> > >> you brought up. If I do start working on it, I will seek input as> > >> early as possible. For example if we had something that accepted a> > >> batch of mutation to write and returned a> > >> CompletableFuture<BatchResult>, then BatchResult could relay failure> > >> information only related to that batch. Also, flush() may not be> > >> needed since a user could just wait on the future if desired.> > >> > >> I'm hoping when something like this exist in the API, optimizations> > >> like pipelining could be made later if not done when its added.> > >> > >> >> > >> > Looking forward to hearing everyone's thoughts.> > >> >> > >> > -Mike> > >> >
