Author: kturner Date: Sat Jun 2 00:05:13 2012 New Revision: 1345399 URL: http://svn.apache.org/viewvc?rev=1345399&view=rev Log: ACCUMULO-580 changed how batch scanner buffers data from background threads
Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java?rev=1345399&r1=1345398&r2=1345399&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java Sat Jun 2 00:05:13 2012 @@ -132,8 +132,10 @@ public class MetadataLocationObtainer im ResultReceiver rr = new ResultReceiver() { @Override - public void receive(Key key, Value value) { - results.put(key, value); + public void receive(List<Entry<Key,Value>> entries) { + for (Entry<Key,Value> entry : entries) { + results.put(entry.getKey(), entry.getValue()); + } } }; Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1345399&r1=1345398&r2=1345399&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java Sat Jun 2 00:05:13 2012 @@ -27,6 +27,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; @@ -82,8 +83,10 @@ public class TabletServerBatchReaderIter private final ExecutorService queryThreadPool; private final ScannerOptions options; - private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue = new ArrayBlockingQueue<Entry<Key,Value>>(1000); - private Entry<Key,Value> nextEntry = null; + private ArrayBlockingQueue<List<Entry<Key,Value>>> resultsQueue; + private Iterator<Entry<Key,Value>> batchIterator; + private List<Entry<Key,Value>> batch; + private static final List<Entry<Key,Value>> LAST_BATCH = new ArrayList<Map.Entry<Key,Value>>(); private Object nextLock = new Object(); private long failSleepTime = 100; @@ -91,7 +94,7 @@ public class TabletServerBatchReaderIter private volatile Throwable fatalException = null; public interface ResultReceiver { - void receive(Key key, Value value); + void receive(List<Entry<Key,Value>> entries); } private static class MyEntry implements Entry<Key,Value> { @@ -131,6 +134,7 @@ public class TabletServerBatchReaderIter this.numThreads = numThreads; this.queryThreadPool = queryThreadPool; this.options = new ScannerOptions(scannerOptions); + resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads); if (options.fetchedColumns.size() > 0) { ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size()); @@ -144,14 +148,14 @@ public class TabletServerBatchReaderIter ResultReceiver rr = new ResultReceiver() { @Override - public void receive(Key key, Value value) { + public void receive(List<Entry<Key,Value>> entries) { try { - resultsQueue.put(new MyEntry(key, value)); + resultsQueue.put(entries); } catch (InterruptedException e) { if (TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown()) - log.debug("Failed to add Batch Scan result for key " + key, e); + log.debug("Failed to add Batch Scan result", e); else - log.warn("Failed to add Batch Scan result for key " + key, e); + log.warn("Failed to add Batch Scan result", e); fatalException = e; throw new RuntimeException(e); @@ -169,17 +173,21 @@ public class TabletServerBatchReaderIter } } + @Override public boolean hasNext() { synchronized (nextLock) { - // check if one was cached - if (nextEntry != null) - return nextEntry.getKey() != null && nextEntry.getValue() != null; + if (batch == LAST_BATCH) + return false; + + if (batch != null && batchIterator.hasNext()) + return true; // don't have one cached, try to cache one and return success try { - while (nextEntry == null && fatalException == null && !queryThreadPool.isShutdown()) - nextEntry = resultsQueue.poll(1, TimeUnit.SECONDS); + batch = null; + while (batch == null && fatalException == null && !queryThreadPool.isShutdown()) + batch = resultsQueue.poll(1, TimeUnit.SECONDS); if (fatalException != null) if (fatalException instanceof RuntimeException) @@ -190,7 +198,8 @@ public class TabletServerBatchReaderIter if (queryThreadPool.isShutdown()) throw new RuntimeException("scanner closed"); - return nextEntry.getKey() != null && nextEntry.getValue() != null; + batchIterator = batch.iterator(); + return batch != LAST_BATCH; } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -199,17 +208,13 @@ public class TabletServerBatchReaderIter @Override public Entry<Key,Value> next() { - Entry<Key,Value> current = null; - // if there's one waiting, or hasNext() can get one, return it synchronized (nextLock) { - if (hasNext()) { - current = nextEntry; - nextEntry = null; - } + if (hasNext()) + return batchIterator.next(); + else + throw new NoSuchElementException(); } - - return current; } @Override @@ -391,22 +396,22 @@ public class TabletServerBatchReaderIter if (fatalException != null) { // we are finished with this batch query - if (!resultsQueue.offer(new MyEntry(null, null))) { + if (!resultsQueue.offer(LAST_BATCH)) { log.debug("Could not add to result queue after seeing fatalException in processFailures", fatalException); } } } else { // we are finished with this batch query if (fatalException != null) { - if (!resultsQueue.offer(new MyEntry(null, null))) { + if (!resultsQueue.offer(LAST_BATCH)) { log.debug("Could not add to result queue after seeing fatalException", fatalException); } } else { try { - resultsQueue.put(new MyEntry(null, null)); + resultsQueue.put(LAST_BATCH); } catch (InterruptedException e) { fatalException = e; - if (!resultsQueue.offer(new MyEntry(null, null))) { + if (!resultsQueue.offer(LAST_BATCH)) { log.debug("Could not add to result queue after seeing fatalException", fatalException); } } @@ -549,9 +554,14 @@ public class TabletServerBatchReaderIter opTimer.stop("Got 1st multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? " scanID=" + imsr.scanID : "") + " in %DURATION%"); + ArrayList<Entry<Key,Value>> entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size()); for (TKeyValue kv : scanResult.results) { - receiver.receive(new Key(kv.key), new Value(kv.value)); + entries.add(new MyEntry(new Key(kv.key), new Value(kv.value))); } + + if (entries.size() > 0) + receiver.receive(entries); + trackScanning(failures, unscanned, scanResult); while (scanResult.more) { @@ -560,9 +570,14 @@ public class TabletServerBatchReaderIter scanResult = client.continueMultiScan(null, imsr.scanID); opTimer.stop("Got more multi scan results, #results=" + scanResult.results.size() + (scanResult.more ? " scanID=" + imsr.scanID : "") + " in %DURATION%"); + + entries = new ArrayList<Map.Entry<Key,Value>>(scanResult.results.size()); for (TKeyValue kv : scanResult.results) { - receiver.receive(new Key(kv.key), new Value(kv.value)); + entries.add(new MyEntry(new Key(kv.key), new Value(kv.value))); } + + if (entries.size() > 0) + receiver.receive(entries); trackScanning(failures, unscanned, scanResult); } Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1345399&r1=1345398&r2=1345399&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Sat Jun 2 00:05:13 2012 @@ -223,7 +223,7 @@ public enum Property { TABLE_MINC_COMPACT_IDLETIME("table.compaction.minor.idle", "5m", PropertyType.TIMEDURATION, "After a tablet has been idle (no mutations) for this time period it may have its " + "in-memory map flushed to disk in a minor compaction. There is no guarantee an idle " + "tablet will be compacted."), - TABLE_SCAN_MAXMEM("table.scan.max.memory", "1M", PropertyType.MEMORY, + TABLE_SCAN_MAXMEM("table.scan.max.memory", "512K", PropertyType.MEMORY, "The maximum amount of memory that will be used to cache results of a client query/scan. " + "Once this limit is reached, the buffered data is sent to the client."), TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION, PropertyType.STRING, "Change the type of file a table writes"),