[
https://issues.apache.org/jira/browse/PHOENIX-2405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15363646#comment-15363646
]
ASF GitHub Bot commented on PHOENIX-2405:
-----------------------------------------
Github user maryannxue commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/175#discussion_r69668023
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/iterate/SpoolingByteBufferSegmentQueue.java
---
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import org.apache.commons.io.input.CountingInputStream;
+import org.apache.commons.io.output.DeferredFileOutputStream;
+
+import org.apache.phoenix.memory.MemoryManager;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+
+
+import java.io.*;
+import java.util.AbstractQueue;
+import java.util.Iterator;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.phoenix.monitoring.GlobalClientMetrics.*;
+
+
+public abstract class SpoolingByteBufferSegmentQueue<T> extends
AbstractQueue<T> {
+
+ private ResultQueue<T> spoolFrom;
+
+ private boolean closed ;
+ private boolean flushed;
+ private DeferredFileOutputStream spoolTo;
+ private MemoryChunk chunk;
+ private int size = 0;
+ private long inMemByteSize = 0L;
+ private int index;
+
+
+
+ SpoolingByteBufferSegmentQueue(int index, MemoryManager mm, final int
thresholdBytes, String spoolDirectory) {
+
+ long startTime = System.currentTimeMillis();
+ chunk = mm.allocate(0, thresholdBytes);
+ long waitTime = System.currentTimeMillis() - startTime;
+ GLOBAL_MEMORY_WAIT_TIME.update(waitTime);
+
+ int size = (int)chunk.getSize();
+ spoolTo = new DeferredFileOutputStream(size,
"ResultSpooler",".bin", new File(spoolDirectory)) {
+ @Override
+ protected void thresholdReached() throws IOException {
+ try {
+ super.thresholdReached();
+ } finally {
+ chunk.close();
+ }
+ }
+ };
+
+
+ }
+
+ public int index() {
+ return this.index;
+ }
+
+
+
+ protected abstract InMemoryResultQueue<T>
createInMemoryResultQueue(byte[] bytes, MemoryChunk memoryChunk);
+
+ protected abstract OnDiskResultQueue<T> createOnDiskResultQueue(File
file);
+
+ @Override
+ public boolean offer(T t) {
+ if (closed || flushed){
+ return false;
+ }
+ boolean result = writeRecord(t, spoolTo);
+ if(result){
+ if(!spoolTo.isInMemory()){
+ flushToDisk();
+ }
+ size++;
+ }
+
+
+ return result;
+ }
+
+ protected abstract boolean writeRecord(T t, OutputStream outputStream);
+
+ private void flushToMemory(){
+ byte[] data = spoolTo.getData();
+ chunk.resize(data.length);
+ spoolFrom = createInMemoryResultQueue(data, chunk);
+ GLOBAL_MEMORY_CHUNK_BYTES.update(data.length);
+ flushed = true;
+ }
+
+
+ private void flushToDisk(){
+ long sizeOfSpoolFile = spoolTo.getFile().length();
+ GLOBAL_SPOOL_FILE_SIZE.update(sizeOfSpoolFile);
+ GLOBAL_SPOOL_FILE_COUNTER.increment();
+ spoolFrom = createOnDiskResultQueue(spoolTo.getFile());
+ if (spoolTo.getFile() != null) {
+ spoolTo.getFile().deleteOnExit();
+ }
+ inMemByteSize = 0;
+ flushed = true;
+ }
+
+
+ public boolean isFlushed(){
+ return flushed;
+ }
+
+ public T peek() {
+ if(!flushed){
+ flushToMemory();
+ }
+ return spoolFrom.peek();
+ }
+
+ @Override
+ public T poll() {
+ if(!flushed){
+ flushToMemory();
+ }
+ return spoolFrom.poll();
+ }
+
+ public void close() throws IOException {
+ if(spoolFrom != null){
+ spoolFrom.close();
+ }
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ if(!flushed){
+ flushToMemory();
+ }
+ return spoolFrom.iterator();
+ }
+
+ @Override
+ public int size() {
+ return size ;
+ }
+
+ public long getInMemByteSize(){
+ return inMemByteSize;
+ };
+
+ private static abstract class ResultQueue<T> extends AbstractQueue<T>
implements Closeable{}
+
+ protected static abstract class InMemoryResultQueue<T> extends
ResultQueue<T> {
+ private final MemoryChunk memoryChunk;
+ protected final byte[] bytes;
+ private T next;
+ private AtomicInteger offset = new AtomicInteger(0);
+
+ protected InMemoryResultQueue(byte[] bytes, MemoryChunk
memoryChunk) {
+ this.bytes = bytes;
+ this.memoryChunk = memoryChunk;
+ advance(offset);
+ }
+
+ protected abstract T advance(AtomicInteger offset);
+
+ @Override
+ public boolean offer(T t) {
+ return false;
+ }
+
+ @Override
+ public T peek(){
+ return next;
+ }
+
+ @Override
+ public T poll() {
+ T current = next;
+ next = advance(offset);
+ return current;
+ }
+
+
+ public void close() {
+ memoryChunk.close();
+ }
+
+
+ @Override
+ public Iterator<T> iterator() {
+ return new Iterator<T>(){
+ AtomicInteger iteratorOffset = new
AtomicInteger(offset.get());
+ private T next = advance(iteratorOffset);
--- End diff --
One reminder: I said in my previous review that we'd better avoid
discarding a old XXXSegmentQueue and starting a new one all the time when we
switch between these segment queues. So we should ultimately remove this offset
thing.
> Improve performance and stability of server side sort for ORDER BY
> ------------------------------------------------------------------
>
> Key: PHOENIX-2405
> URL: https://issues.apache.org/jira/browse/PHOENIX-2405
> Project: Phoenix
> Issue Type: Bug
> Reporter: James Taylor
> Assignee: Haoran Zhang
> Labels: gsoc2016
> Fix For: 4.8.0
>
>
> We currently use memory mapped files to buffer data as it's being sorted in
> an ORDER BY (see MappedByteBufferQueue). The following types of exceptions
> have been seen to occur:
> {code}
> Caused by: java.lang.OutOfMemoryError: Map failed
> at sun.nio.ch.FileChannelImpl.map0(Native Method)
> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:904)
> {code}
> [~apurtell] has read that memory mapped files are not cleaned up after very
> well in Java:
> {quote}
> "Map failed" means the JVM ran out of virtual address space. If you search
> around stack overflow for suggestions on what to do when your app (in this
> case Phoenix) encounters this issue when using mapped buffers, the answers
> tend toward manually cleaning up the mapped buffers or explicitly triggering
> a full GC. See
> http://stackoverflow.com/questions/8553158/prevent-outofmemory-when-using-java-nio-mappedbytebuffer
> for example. There are apparently long standing JVM/JRE problems with
> reclamation of mapped buffers. I think we may want to explore in Phoenix a
> different way to achieve what the current code is doing.
> {quote}
> Instead of using memory mapped files, we could use heap memory, or perhaps
> there are other mechanisms too.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)