Repository: phoenix
Updated Branches:
  refs/heads/3.0 7a9fd9981 -> b51318d09


PHOENIX-1216 fix code Change spooling directory

Make the spooling directory configurable.

Signed-off-by: Gabriel Reid <gabri...@ngdata.com>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b51318d0
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b51318d0
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b51318d0

Branch: refs/heads/3.0
Commit: b51318d09bd450c906e7b65348d54ea73151099f
Parents: 7a9fd99
Author: sofangel <sofan...@naver.com>
Authored: Tue Sep 16 15:31:34 2014 +0900
Committer: Gabriel Reid <gabri...@ngdata.com>
Committed: Tue Sep 16 12:06:24 2014 +0200

----------------------------------------------------------------------
 .../phoenix/iterate/SpoolingResultIterator.java | 59 ++++++++++----------
 .../org/apache/phoenix/query/QueryServices.java |  2 +-
 .../phoenix/query/QueryServicesOptions.java     | 11 +++-
 .../iterate/SpoolingResultIteratorTest.java     |  2 +-
 .../phoenix/query/QueryServicesTestImpl.java    |  2 +
 5 files changed, 43 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
index 22cd049..f35999e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java
@@ -47,19 +47,19 @@ import org.apache.phoenix.util.TupleUtil;
 
 
 /**
- * 
+ *
  * Result iterator that spools the results of a scan to disk once an in-memory 
threshold has been reached.
  * If the in-memory threshold is not reached, the results are held in memory 
with no disk writing perfomed.
  *
- * 
+ *
  * @since 0.1
  */
 public class SpoolingResultIterator implements PeekingResultIterator {
     private final PeekingResultIterator spoolFrom;
-    
+
     public static class SpoolingResultIteratorFactory implements 
ParallelIteratorFactory {
         private final QueryServices services;
-        
+
         public SpoolingResultIteratorFactory(QueryServices services) {
             this.services = services;
         }
@@ -67,15 +67,16 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
         public PeekingResultIterator newIterator(StatementContext context, 
ResultIterator scanner, Scan scan) throws SQLException {
             return new SpoolingResultIterator(scanner, services);
         }
-        
+
     }
 
     public SpoolingResultIterator(ResultIterator scanner, QueryServices 
services) throws SQLException {
-        this (scanner, services.getMemoryManager(), 
-                       
services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
-                       
services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES));
+        this (scanner, services.getMemoryManager(),
+                
services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES),
+                
services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, 
QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES),
+                services.getProps().get(QueryServices.SPOOL_DIRECTORY, 
QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY));
     }
-    
+
     /**
     * Create a result iterator by iterating through the results of a scan, 
spooling them to disk once
     * a threshold has been reached. The scanner passed in is closed prior to 
returning.
@@ -85,7 +86,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
     *  the memory manager) is exceeded.
     * @throws SQLException
     */
-    SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int 
thresholdBytes, final long maxSpoolToDisk) throws SQLException {
+    SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int 
thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws 
SQLException {
         boolean success = false;
         boolean usedOnDiskIterator = false;
         final MemoryChunk chunk = mm.allocate(0, thresholdBytes);
@@ -93,7 +94,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
         try {
             // Can't be bigger than int, since it's the max of the above 
allocation
             int size = (int)chunk.getSize();
-            tempFile = File.createTempFile("ResultSpooler",".bin");
+            tempFile = File.createTempFile("ResultSpooler",".bin", new 
File(spoolDirectory));
             DeferredFileOutputStream spoolTo = new 
DeferredFileOutputStream(size, tempFile) {
                 @Override
                 protected void thresholdReached() throws IOException {
@@ -102,7 +103,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
                 }
             };
             DataOutputStream out = new DataOutputStream(spoolTo);
-            final long maxBytesAllowed = maxSpoolToDisk == -1 ? 
+            final long maxBytesAllowed = maxSpoolToDisk == -1 ?
                        Long.MAX_VALUE : thresholdBytes + maxSpoolToDisk;
             long bytesWritten = 0L;
             int maxSize = 0;
@@ -152,17 +153,17 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
     public Tuple next() throws SQLException {
         return spoolFrom.next();
     }
-    
+
     @Override
     public void close() throws SQLException {
         spoolFrom.close();
     }
 
     /**
-     * 
+     *
      * Backing result iterator if it was not necessary to spool results to 
disk.
      *
-     * 
+     *
      * @since 0.1
      */
     private static class InMemoryResultIterator implements 
PeekingResultIterator {
@@ -170,7 +171,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
         private final byte[] bytes;
         private Tuple next;
         private int offset;
-        
+
         private InMemoryResultIterator(byte[] bytes, MemoryChunk memoryChunk) 
throws SQLException {
             this.bytes = bytes;
             this.memoryChunk = memoryChunk;
@@ -188,7 +189,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
             Tuple result = new ResultTuple(new Result(value));
             return next = result;
         }
-        
+
         @Override
         public Tuple peek() throws SQLException {
             return next;
@@ -200,7 +201,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
             advance();
             return current;
         }
-        
+
         @Override
         public void close() {
             memoryChunk.close();
@@ -210,12 +211,12 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
         public void explain(List<String> planSteps) {
         }
     }
-    
+
     /**
-     * 
+     *
      * Backing result iterator if results were spooled to disk
      *
-     * 
+     *
      * @since 0.1
      */
     private static class OnDiskResultIterator implements PeekingResultIterator 
{
@@ -226,12 +227,12 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
         private int bufferIndex;
         private byte[][] buffers = new byte[2][];
         private boolean isClosed;
-        
+
         private OnDiskResultIterator (int maxSize, File file) {
             this.file = file;
             this.maxSize = maxSize;
         }
-        
+
         private synchronized void init() throws IOException {
             if (spoolFrom == null) {
                 spoolFrom = new DataInputStream(new BufferedInputStream(new 
FileInputStream(file)));
@@ -241,7 +242,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
                 advance();
             }
         }
-    
+
         private synchronized void reachedEnd() throws IOException {
             next = null;
             isClosed = true;
@@ -253,7 +254,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
                 file.delete();
             }
         }
-        
+
         private synchronized Tuple advance() throws IOException {
             if (isClosed) {
                 return next;
@@ -282,7 +283,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
             next = new ResultTuple(new Result(new 
ImmutableBytesWritable(buffer,0,length)));
             return next;
         }
-        
+
         @Override
         public synchronized Tuple peek() throws SQLException {
             try {
@@ -292,7 +293,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
                 throw ServerUtil.parseServerException(e);
             }
         }
-    
+
         @Override
         public synchronized Tuple next() throws SQLException {
             try {
@@ -304,7 +305,7 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
                 throw ServerUtil.parseServerException(e);
             }
         }
-        
+
         @Override
         public synchronized void close() throws SQLException {
             try {
@@ -324,4 +325,4 @@ public class SpoolingResultIterator implements 
PeekingResultIterator {
     @Override
     public void explain(List<String> planSteps) {
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 3e0e461..da35626 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -46,7 +46,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String SPOOL_THRESHOLD_BYTES_ATTRIB = 
"phoenix.query.spoolThresholdBytes";
     public static final String HBASE_CLIENT_KEYTAB = "hbase.myclient.keytab";
     public static final String HBASE_CLIENT_PRINCIPAL = 
"hbase.myclient.principal";
-
+    public static final String SPOOL_DIRECTORY = "phoenix.spool.directory";
     
     /**
         * max size to spool the the result into

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 04f31e6..78681a0 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -48,6 +48,7 @@ import static 
org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE;
 import static 
org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY;
 import static 
org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB;
@@ -76,6 +77,7 @@ public class QueryServicesOptions {
        public static final int DEFAULT_QUEUE_SIZE = 500;
        public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min
        public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 
20; // 20m
+    public static final String DEFAULT_SPOOL_DIRECTORY = "/tmp";
        public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap
        public static final int DEFAULT_MAX_MEMORY_WAIT_MS = 10000;
        public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100;
@@ -159,6 +161,7 @@ public class QueryServicesOptions {
             .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE)
             .setIfUnset(THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS)
             .setIfUnset(SPOOL_THRESHOLD_BYTES_ATTRIB, 
DEFAULT_SPOOL_THRESHOLD_BYTES)
+            .setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY)
             .setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC)
             .setIfUnset(MAX_MEMORY_WAIT_MS_ATTRIB, DEFAULT_MAX_MEMORY_WAIT_MS)
             .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, 
DEFAULT_MAX_TENANT_MEMORY_PERC)
@@ -213,7 +216,7 @@ public class QueryServicesOptions {
         config.setIfUnset(name, Long.toString(value));
         return this;
     }
-    
+
     private QueryServicesOptions setIfUnset(String name, String value) {
         config.setIfUnset(name, value);
         return this;
@@ -239,7 +242,11 @@ public class QueryServicesOptions {
     public QueryServicesOptions setSpoolThresholdBytes(int 
spoolThresholdBytes) {
         return set(SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes);
     }
-    
+
+    public QueryServicesOptions setSpoolDirectory(String spoolDirectory) {
+        return set(SPOOL_DIRECTORY, spoolDirectory);
+    }
+
     public QueryServicesOptions setMaxMemoryPerc(int maxMemoryPerc) {
         return set(MAX_MEMORY_PERC_ATTRIB, maxMemoryPerc);
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
index e1b6864..ab6a4a7 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java
@@ -52,7 +52,7 @@ public class SpoolingResultIteratorTest {
             };
 
         MemoryManager memoryManager = new DelegatingMemoryManager(new 
GlobalMemoryManager(threshold, 0));
-        ResultIterator scanner = new SpoolingResultIterator(iterator, 
memoryManager, threshold, maxSizeSpool);
+        ResultIterator scanner = new SpoolingResultIterator(iterator, 
memoryManager, threshold, maxSizeSpool,"/tmp");
         AssertResults.assertResults(scanner, expectedResults);
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index e125755..47f5b1b 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.phoenix.query;
 
+import static 
org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY;
 import static org.apache.phoenix.query.QueryServicesOptions.withDefaults;
 
 import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec;
@@ -64,6 +65,7 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
                 .setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC)
                 .setThreadTimeoutMs(DEFAULT_THREAD_TIMEOUT_MS)
                 .setSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES)
+                .setSpoolDirectory(DEFAULT_SPOOL_DIRECTORY)
                 .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS)
                 .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC)
                 .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE)

Reply via email to