Repository: phoenix Updated Branches: refs/heads/3.0 c77120e98 -> b7cb3c836
PHOENIX-1456 Incorrect query results caused by reusing buffers in SpoolingResultIterator Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b7cb3c83 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b7cb3c83 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b7cb3c83 Branch: refs/heads/3.0 Commit: b7cb3c836873f627d27d5db939e06c6ffb0e376e Parents: c77120e Author: maryannxue <maryann...@apache.org> Authored: Wed Nov 19 12:32:39 2014 -0500 Committer: maryannxue <maryann...@apache.org> Committed: Wed Nov 19 12:32:39 2014 -0500 ---------------------------------------------------------------------- .../phoenix/end2end/SpooledSortMergeJoinIT.java | 46 ++++++++++++++++++++ .../phoenix/iterate/SpoolingResultIterator.java | 17 ++------ 2 files changed, 49 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7cb3c83/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledSortMergeJoinIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledSortMergeJoinIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledSortMergeJoinIT.java new file mode 100644 index 0000000..d602423 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpooledSortMergeJoinIT.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import java.util.Map; + +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +import com.google.common.collect.Maps; + +@Category(HBaseManagedTimeTest.class) +public class SpooledSortMergeJoinIT extends SortMergeJoinIT { + + public SpooledSortMergeJoinIT(String[] indexDDL, String[] plans) { + super(indexDDL, plans); + } + + @BeforeClass + @Shadower(classBeingShadowed = BaseClientManagedTimeIT.class) + public static void doSetup() throws Exception { + Map<String,String> props = Maps.newHashMapWithExpectedSize(1);; + props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(100)); + // Must update config before starting server + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } + +} + http://git-wip-us.apache.org/repos/asf/phoenix/blob/b7cb3c83/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java index 3d98a2c..d42b59d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingResultIterator.java @@ -103,21 +103,19 @@ public class SpoolingResultIterator implements PeekingResultIterator { final long maxBytesAllowed = maxSpoolToDisk == -1 ? Long.MAX_VALUE : thresholdBytes + maxSpoolToDisk; long bytesWritten = 0L; - int maxSize = 0; for (Tuple result = scanner.next(); result != null; result = scanner.next()) { int length = TupleUtil.write(result, out); bytesWritten += length; if(bytesWritten > maxBytesAllowed){ throw new SpoolTooBigToDiskException("result too big, max allowed(bytes): " + maxBytesAllowed); } - maxSize = Math.max(length, maxSize); } if (spoolTo.isInMemory()) { byte[] data = spoolTo.getData(); chunk.resize(data.length); spoolFrom = new InMemoryResultIterator(data, chunk); } else { - spoolFrom = new OnDiskResultIterator(maxSize, spoolTo.getFile()); + spoolFrom = new OnDiskResultIterator(spoolTo.getFile()); } success = true; } catch (IOException e) { @@ -223,22 +221,15 @@ public class SpoolingResultIterator implements PeekingResultIterator { private final File file; private DataInputStream spoolFrom; private Tuple next; - private int maxSize; - private int bufferIndex; - private byte[][] buffers = new byte[2][]; private boolean isClosed; - private OnDiskResultIterator (int maxSize, File file) { + private OnDiskResultIterator (File file) { this.file = file; - this.maxSize = maxSize; } private synchronized void init() throws IOException { if (spoolFrom == null) { spoolFrom = new DataInputStream(new BufferedInputStream(new FileInputStream(file))); - // We need two so that we can have a current and a next without them stomping on each other - buffers[0] = new byte[maxSize]; - buffers[1] = new byte[maxSize]; advance(); } } @@ -268,9 +259,7 @@ public class SpoolingResultIterator implements PeekingResultIterator { } int totalBytesRead = 0; int offset = 0; - // Alternate between buffers so that the current one is not affected by advancing - bufferIndex = (bufferIndex + 1) % 2; - byte[] buffer = buffers [bufferIndex]; + byte[] buffer = new byte[length]; while(totalBytesRead < length) { int bytesRead = spoolFrom.read(buffer, offset, length); if (bytesRead == -1) {