Author: kturner
Date: Sat Jun  2 00:05:13 2012
New Revision: 1345399

URL: http://svn.apache.org/viewvc?rev=1345399&view=rev
Log:
ACCUMULO-580 changed how batch scanner buffers data from background threads

Modified:
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
    
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java?rev=1345399&r1=1345398&r2=1345399&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/MetadataLocationObtainer.java
 Sat Jun  2 00:05:13 2012
@@ -132,8 +132,10 @@ public class MetadataLocationObtainer im
     ResultReceiver rr = new ResultReceiver() {
       
       @Override
-      public void receive(Key key, Value value) {
-        results.put(key, value);
+      public void receive(List<Entry<Key,Value>> entries) {
+        for (Entry<Key,Value> entry : entries) {
+          results.put(entry.getKey(), entry.getValue());
+        }
       }
     };
     

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1345399&r1=1345398&r2=1345399&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
 (original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
 Sat Jun  2 00:05:13 2012
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Semaphore;
@@ -82,8 +83,10 @@ public class TabletServerBatchReaderIter
   private final ExecutorService queryThreadPool;
   private final ScannerOptions options;
   
-  private ArrayBlockingQueue<Entry<Key,Value>> resultsQueue = new 
ArrayBlockingQueue<Entry<Key,Value>>(1000);
-  private Entry<Key,Value> nextEntry = null;
+  private ArrayBlockingQueue<List<Entry<Key,Value>>> resultsQueue;
+  private Iterator<Entry<Key,Value>> batchIterator;
+  private List<Entry<Key,Value>> batch;
+  private static final List<Entry<Key,Value>> LAST_BATCH = new 
ArrayList<Map.Entry<Key,Value>>();
   private Object nextLock = new Object();
   
   private long failSleepTime = 100;
@@ -91,7 +94,7 @@ public class TabletServerBatchReaderIter
   private volatile Throwable fatalException = null;
   
   public interface ResultReceiver {
-    void receive(Key key, Value value);
+    void receive(List<Entry<Key,Value>> entries);
   }
   
   private static class MyEntry implements Entry<Key,Value> {
@@ -131,6 +134,7 @@ public class TabletServerBatchReaderIter
     this.numThreads = numThreads;
     this.queryThreadPool = queryThreadPool;
     this.options = new ScannerOptions(scannerOptions);
+    resultsQueue = new ArrayBlockingQueue<List<Entry<Key,Value>>>(numThreads);
     
     if (options.fetchedColumns.size() > 0) {
       ArrayList<Range> ranges2 = new ArrayList<Range>(ranges.size());
@@ -144,14 +148,14 @@ public class TabletServerBatchReaderIter
     ResultReceiver rr = new ResultReceiver() {
       
       @Override
-      public void receive(Key key, Value value) {
+      public void receive(List<Entry<Key,Value>> entries) {
         try {
-          resultsQueue.put(new MyEntry(key, value));
+          resultsQueue.put(entries);
         } catch (InterruptedException e) {
           if 
(TabletServerBatchReaderIterator.this.queryThreadPool.isShutdown())
-            log.debug("Failed to add Batch Scan result for key " + key, e);
+            log.debug("Failed to add Batch Scan result", e);
           else
-            log.warn("Failed to add Batch Scan result for key " + key, e);
+            log.warn("Failed to add Batch Scan result", e);
           fatalException = e;
           throw new RuntimeException(e);
           
@@ -169,17 +173,21 @@ public class TabletServerBatchReaderIter
     }
   }
   
+
   @Override
   public boolean hasNext() {
     synchronized (nextLock) {
-      // check if one was cached
-      if (nextEntry != null)
-        return nextEntry.getKey() != null && nextEntry.getValue() != null;
+      if (batch == LAST_BATCH)
+        return false;
+      
+      if (batch != null && batchIterator.hasNext())
+        return true;
       
       // don't have one cached, try to cache one and return success
       try {
-        while (nextEntry == null && fatalException == null && 
!queryThreadPool.isShutdown())
-          nextEntry = resultsQueue.poll(1, TimeUnit.SECONDS);
+        batch = null;
+        while (batch == null && fatalException == null && 
!queryThreadPool.isShutdown())
+          batch = resultsQueue.poll(1, TimeUnit.SECONDS);
         
         if (fatalException != null)
           if (fatalException instanceof RuntimeException)
@@ -190,7 +198,8 @@ public class TabletServerBatchReaderIter
         if (queryThreadPool.isShutdown())
           throw new RuntimeException("scanner closed");
 
-        return nextEntry.getKey() != null && nextEntry.getValue() != null;
+        batchIterator = batch.iterator();
+        return batch != LAST_BATCH;
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }
@@ -199,17 +208,13 @@ public class TabletServerBatchReaderIter
   
   @Override
   public Entry<Key,Value> next() {
-    Entry<Key,Value> current = null;
-    
     // if there's one waiting, or hasNext() can get one, return it
     synchronized (nextLock) {
-      if (hasNext()) {
-        current = nextEntry;
-        nextEntry = null;
-      }
+      if (hasNext())
+        return batchIterator.next();
+      else
+        throw new NoSuchElementException();
     }
-    
-    return current;
   }
   
   @Override
@@ -391,22 +396,22 @@ public class TabletServerBatchReaderIter
             
             if (fatalException != null) {
               // we are finished with this batch query
-              if (!resultsQueue.offer(new MyEntry(null, null))) {
+              if (!resultsQueue.offer(LAST_BATCH)) {
                 log.debug("Could not add to result queue after seeing 
fatalException in processFailures", fatalException);
               }
             }
           } else {
             // we are finished with this batch query
             if (fatalException != null) {
-              if (!resultsQueue.offer(new MyEntry(null, null))) {
+              if (!resultsQueue.offer(LAST_BATCH)) {
                 log.debug("Could not add to result queue after seeing 
fatalException", fatalException);
               }
             } else {
               try {
-                resultsQueue.put(new MyEntry(null, null));
+                resultsQueue.put(LAST_BATCH);
               } catch (InterruptedException e) {
                 fatalException = e;
-                if (!resultsQueue.offer(new MyEntry(null, null))) {
+                if (!resultsQueue.offer(LAST_BATCH)) {
                   log.debug("Could not add to result queue after seeing 
fatalException", fatalException);
                 }
               }
@@ -549,9 +554,14 @@ public class TabletServerBatchReaderIter
         opTimer.stop("Got 1st multi scan results, #results=" + 
scanResult.results.size() + (scanResult.more ? "  scanID=" + imsr.scanID : "")
             + " in %DURATION%");
         
+        ArrayList<Entry<Key,Value>> entries = new 
ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
         for (TKeyValue kv : scanResult.results) {
-          receiver.receive(new Key(kv.key), new Value(kv.value));
+          entries.add(new MyEntry(new Key(kv.key), new Value(kv.value)));
         }
+        
+        if (entries.size() > 0)
+          receiver.receive(entries);
+
         trackScanning(failures, unscanned, scanResult);
         
         while (scanResult.more) {
@@ -560,9 +570,14 @@ public class TabletServerBatchReaderIter
           scanResult = client.continueMultiScan(null, imsr.scanID);
           opTimer.stop("Got more multi scan results, #results=" + 
scanResult.results.size() + (scanResult.more ? "  scanID=" + imsr.scanID : "")
               + " in %DURATION%");
+          
+          entries = new 
ArrayList<Map.Entry<Key,Value>>(scanResult.results.size());
           for (TKeyValue kv : scanResult.results) {
-            receiver.receive(new Key(kv.key), new Value(kv.value));
+            entries.add(new MyEntry(new Key(kv.key), new Value(kv.value)));
           }
+          
+          if (entries.size() > 0)
+            receiver.receive(entries);
           trackScanning(failures, unscanned, scanResult);
         }
         

Modified: 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: 
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1345399&r1=1345398&r2=1345399&view=diff
==============================================================================
--- 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
(original)
+++ 
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java 
Sat Jun  2 00:05:13 2012
@@ -223,7 +223,7 @@ public enum Property {
   TABLE_MINC_COMPACT_IDLETIME("table.compaction.minor.idle", "5m", 
PropertyType.TIMEDURATION,
       "After a tablet has been idle (no mutations) for this time period it may 
have its "
           + "in-memory map flushed to disk in a minor compaction.  There is no 
guarantee an idle " + "tablet will be compacted."),
-  TABLE_SCAN_MAXMEM("table.scan.max.memory", "1M", PropertyType.MEMORY,
+  TABLE_SCAN_MAXMEM("table.scan.max.memory", "512K", PropertyType.MEMORY,
       "The maximum amount of memory that will be used to cache results of a 
client query/scan. "
           + "Once this limit is reached, the buffered data is sent to the 
client."),
   TABLE_FILE_TYPE("table.file.type", RFile.EXTENSION, PropertyType.STRING, 
"Change the type of file a table writes"),


Reply via email to