Repository: phoenix Updated Branches: refs/heads/3.0 7a9fd9981 -> b51318d09
PHOENIX-1216 fix code Change spooling directory Make the spooling directory configurable. Signed-off-by: Gabriel Reid <gabri...@ngdata.com> Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b51318d0 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b51318d0 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b51318d0 Branch: refs/heads/3.0 Commit: b51318d09bd450c906e7b65348d54ea73151099f Parents: 7a9fd99 Author: sofangel <sofan...@naver.com> Authored: Tue Sep 16 15:31:34 2014 +0900 Committer: Gabriel Reid <gabri...@ngdata.com> Committed: Tue Sep 16 12:06:24 2014 +0200 ---------------------------------------------------------------------- .../phoenix/iterate/SpoolingResultIterator.java | 59 ++++++++++---------- .../org/apache/phoenix/query/QueryServices.java | 2 +- .../phoenix/query/QueryServicesOptions.java | 11 +++- .../iterate/SpoolingResultIteratorTest.java | 2 +- .../phoenix/query/QueryServicesTestImpl.java | 2 + 5 files changed, 43 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java index 22cd049..f35999e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java @@ -47,19 +47,19 @@ import org.apache.phoenix.util.TupleUtil; /** - * + * * Result iterator that spools the results of a scan to disk once an in-memory threshold has been reached. * If the in-memory threshold is not reached, the results are held in memory with no disk writing perfomed. * - * + * * @since 0.1 */ public class SpoolingResultIterator implements PeekingResultIterator { private final PeekingResultIterator spoolFrom; - + public static class SpoolingResultIteratorFactory implements ParallelIteratorFactory { private final QueryServices services; - + public SpoolingResultIteratorFactory(QueryServices services) { this.services = services; } @@ -67,15 +67,16 @@ public class SpoolingResultIterator implements PeekingResultIterator { public PeekingResultIterator newIterator(StatementContext context, ResultIterator scanner, Scan scan) throws SQLException { return new SpoolingResultIterator(scanner, services); } - + } public SpoolingResultIterator(ResultIterator scanner, QueryServices services) throws SQLException { - this (scanner, services.getMemoryManager(), - services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES), - services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES)); + this (scanner, services.getMemoryManager(), + services.getProps().getInt(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_SPOOL_THRESHOLD_BYTES), + services.getProps().getLong(QueryServices.MAX_SPOOL_TO_DISK_BYTES_ATTRIB, QueryServicesOptions.DEFAULT_MAX_SPOOL_TO_DISK_BYTES), + services.getProps().get(QueryServices.SPOOL_DIRECTORY, QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY)); } - + /** * Create a result iterator by iterating through the results of a scan, spooling them to disk once * a threshold has been reached. The scanner passed in is closed prior to returning. @@ -85,7 +86,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { * the memory manager) is exceeded. * @throws SQLException */ - SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk) throws SQLException { + SpoolingResultIterator(ResultIterator scanner, MemoryManager mm, final int thresholdBytes, final long maxSpoolToDisk, final String spoolDirectory) throws SQLException { boolean success = false; boolean usedOnDiskIterator = false; final MemoryChunk chunk = mm.allocate(0, thresholdBytes); @@ -93,7 +94,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { try { // Can't be bigger than int, since it's the max of the above allocation int size = (int)chunk.getSize(); - tempFile = File.createTempFile("ResultSpooler",".bin"); + tempFile = File.createTempFile("ResultSpooler",".bin", new File(spoolDirectory)); DeferredFileOutputStream spoolTo = new DeferredFileOutputStream(size, tempFile) { @Override protected void thresholdReached() throws IOException { @@ -102,7 +103,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { } }; DataOutputStream out = new DataOutputStream(spoolTo); - final long maxBytesAllowed = maxSpoolToDisk == -1 ? + final long maxBytesAllowed = maxSpoolToDisk == -1 ? Long.MAX_VALUE : thresholdBytes + maxSpoolToDisk; long bytesWritten = 0L; int maxSize = 0; @@ -152,17 +153,17 @@ public class SpoolingResultIterator implements PeekingResultIterator { public Tuple next() throws SQLException { return spoolFrom.next(); } - + @Override public void close() throws SQLException { spoolFrom.close(); } /** - * + * * Backing result iterator if it was not necessary to spool results to disk. * - * + * * @since 0.1 */ private static class InMemoryResultIterator implements PeekingResultIterator { @@ -170,7 +171,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { private final byte[] bytes; private Tuple next; private int offset; - + private InMemoryResultIterator(byte[] bytes, MemoryChunk memoryChunk) throws SQLException { this.bytes = bytes; this.memoryChunk = memoryChunk; @@ -188,7 +189,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { Tuple result = new ResultTuple(new Result(value)); return next = result; } - + @Override public Tuple peek() throws SQLException { return next; @@ -200,7 +201,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { advance(); return current; } - + @Override public void close() { memoryChunk.close(); @@ -210,12 +211,12 @@ public class SpoolingResultIterator implements PeekingResultIterator { public void explain(List<String> planSteps) { } } - + /** - * + * * Backing result iterator if results were spooled to disk * - * + * * @since 0.1 */ private static class OnDiskResultIterator implements PeekingResultIterator { @@ -226,12 +227,12 @@ public class SpoolingResultIterator implements PeekingResultIterator { private int bufferIndex; private byte[][] buffers = new byte[2][]; private boolean isClosed; - + private OnDiskResultIterator (int maxSize, File file) { this.file = file; this.maxSize = maxSize; } - + private synchronized void init() throws IOException { if (spoolFrom == null) { spoolFrom = new DataInputStream(new BufferedInputStream(new FileInputStream(file))); @@ -241,7 +242,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { advance(); } } - + private synchronized void reachedEnd() throws IOException { next = null; isClosed = true; @@ -253,7 +254,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { file.delete(); } } - + private synchronized Tuple advance() throws IOException { if (isClosed) { return next; @@ -282,7 +283,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { next = new ResultTuple(new Result(new ImmutableBytesWritable(buffer,0,length))); return next; } - + @Override public synchronized Tuple peek() throws SQLException { try { @@ -292,7 +293,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { throw ServerUtil.parseServerException(e); } } - + @Override public synchronized Tuple next() throws SQLException { try { @@ -304,7 +305,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { throw ServerUtil.parseServerException(e); } } - + @Override public synchronized void close() throws SQLException { try { @@ -324,4 +325,4 @@ public class SpoolingResultIterator implements PeekingResultIterator { @Override public void explain(List<String> planSteps) { } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index 3e0e461..da35626 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -46,7 +46,7 @@ public interface QueryServices extends SQLCloseable { public static final String SPOOL_THRESHOLD_BYTES_ATTRIB = "phoenix.query.spoolThresholdBytes"; public static final String HBASE_CLIENT_KEYTAB = "hbase.myclient.keytab"; public static final String HBASE_CLIENT_PRINCIPAL = "hbase.myclient.principal"; - + public static final String SPOOL_DIRECTORY = "phoenix.spool.directory"; /** * max size to spool the the result into http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 04f31e6..78681a0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -48,6 +48,7 @@ import static org.apache.phoenix.query.QueryServices.RPC_TIMEOUT_ATTRIB; import static org.apache.phoenix.query.QueryServices.SCAN_CACHE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.SCAN_RESULT_CHUNK_SIZE; import static org.apache.phoenix.query.QueryServices.SEQUENCE_CACHE_SIZE_ATTRIB; +import static org.apache.phoenix.query.QueryServices.SPOOL_DIRECTORY; import static org.apache.phoenix.query.QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.STATS_UPDATE_FREQ_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.TARGET_QUERY_CONCURRENCY_ATTRIB; @@ -76,6 +77,7 @@ public class QueryServicesOptions { public static final int DEFAULT_QUEUE_SIZE = 500; public static final int DEFAULT_THREAD_TIMEOUT_MS = 600000; // 10min public static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024 * 20; // 20m + public static final String DEFAULT_SPOOL_DIRECTORY = "/tmp"; public static final int DEFAULT_MAX_MEMORY_PERC = 15; // 15% of heap public static final int DEFAULT_MAX_MEMORY_WAIT_MS = 10000; public static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100; @@ -159,6 +161,7 @@ public class QueryServicesOptions { .setIfUnset(QUEUE_SIZE_ATTRIB, DEFAULT_QUEUE_SIZE) .setIfUnset(THREAD_TIMEOUT_MS_ATTRIB, DEFAULT_THREAD_TIMEOUT_MS) .setIfUnset(SPOOL_THRESHOLD_BYTES_ATTRIB, DEFAULT_SPOOL_THRESHOLD_BYTES) + .setIfUnset(SPOOL_DIRECTORY, DEFAULT_SPOOL_DIRECTORY) .setIfUnset(MAX_MEMORY_PERC_ATTRIB, DEFAULT_MAX_MEMORY_PERC) .setIfUnset(MAX_MEMORY_WAIT_MS_ATTRIB, DEFAULT_MAX_MEMORY_WAIT_MS) .setIfUnset(MAX_TENANT_MEMORY_PERC_ATTRIB, DEFAULT_MAX_TENANT_MEMORY_PERC) @@ -213,7 +216,7 @@ public class QueryServicesOptions { config.setIfUnset(name, Long.toString(value)); return this; } - + private QueryServicesOptions setIfUnset(String name, String value) { config.setIfUnset(name, value); return this; @@ -239,7 +242,11 @@ public class QueryServicesOptions { public QueryServicesOptions setSpoolThresholdBytes(int spoolThresholdBytes) { return set(SPOOL_THRESHOLD_BYTES_ATTRIB, spoolThresholdBytes); } - + + public QueryServicesOptions setSpoolDirectory(String spoolDirectory) { + return set(SPOOL_DIRECTORY, spoolDirectory); + } + public QueryServicesOptions setMaxMemoryPerc(int maxMemoryPerc) { return set(MAX_MEMORY_PERC_ATTRIB, maxMemoryPerc); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java index e1b6864..ab6a4a7 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/SpoolingResultIteratorTest.java @@ -52,7 +52,7 @@ public class SpoolingResultIteratorTest { }; MemoryManager memoryManager = new DelegatingMemoryManager(new GlobalMemoryManager(threshold, 0)); - ResultIterator scanner = new SpoolingResultIterator(iterator, memoryManager, threshold, maxSizeSpool); + ResultIterator scanner = new SpoolingResultIterator(iterator, memoryManager, threshold, maxSizeSpool,"/tmp"); AssertResults.assertResults(scanner, expectedResults); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b51318d0/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index e125755..47f5b1b 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -17,6 +17,7 @@ */ package org.apache.phoenix.query; +import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_SPOOL_DIRECTORY; import static org.apache.phoenix.query.QueryServicesOptions.withDefaults; import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec; @@ -64,6 +65,7 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setMaxMemoryPerc(DEFAULT_MAX_MEMORY_PERC) .setThreadTimeoutMs(DEFAULT_THREAD_TIMEOUT_MS) .setSpoolThresholdBytes(DEFAULT_SPOOL_THRESHOLD_BYTES) + .setSpoolDirectory(DEFAULT_SPOOL_DIRECTORY) .setMaxMemoryWaitMs(DEFAULT_MAX_MEMORY_WAIT_MS) .setMaxTenantMemoryPerc(DEFAULT_MAX_TENANT_MEMORY_PERC) .setMaxServerCacheSize(DEFAULT_MAX_HASH_CACHE_SIZE)