Github user maryannxue commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/171#discussion_r65294053
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferSegmentQueue.java
---
@@ -0,0 +1,251 @@
+package org.apache.phoenix.iterate;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.output.DeferredFileOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+import java.io.*;
+import java.util.*;
+
+public abstract class BufferSegmentQueue<T> extends AbstractQueue<T> {
+ protected static final int EOF = -1;
+
+ protected final int index;
+ protected final int thresholdBytes;
+ protected final boolean hasMaxQueueSize;
+ protected long totalResultSize = 0;
+ protected int maxResultSize = 0;
+
+ protected File file;
+ private boolean isClosed = false;
+ protected boolean flushBuffer = false;
+ protected int flushedCount = 0;
+
+ private T current = null;
+
+ protected SegmentQueueFileIterator thisIterator;
+ // iterators to close on close()
+ protected List<SegmentQueueFileIterator> iterators;
+
+ public BufferSegmentQueue(int index, int thresholdBytes, boolean
hasMaxQueueSize) {
+ this.index = index;
+ this.thresholdBytes = thresholdBytes;
+ this.hasMaxQueueSize = hasMaxQueueSize;
+ this.iterators = Lists.<SegmentQueueFileIterator> newArrayList();
+ }
+
+ abstract protected Queue<T> getInMemoryQueue();
+ abstract protected int sizeOf(T e);
+
+
+ public int index() {
+ return this.index;
+ }
+
+ public int size() {
+ if (flushBuffer)
+ return flushedCount;
+ return getInMemoryQueue().size();
+ }
+
+ public long getInMemByteSize() {
+ if (flushBuffer)
+ return 0;
+ return totalResultSize;
+ }
+
+ public boolean isFlushed() {
+ return flushBuffer;
+ }
+
+ @Override
+ public boolean offer(T e) {
+ if (isClosed || flushBuffer)
+ return false;
+
+ boolean added = getInMemoryQueue().add(e);
+ if (added) {
+ try {
+ flush(e);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ return added;
+ }
+
+ @Override
+ public T peek() {
+ if (current == null && !isClosed) {
+ current = next();
+ }
+
+ return current;
+ }
+
+ @Override
+ public T poll() {
+ T ret = peek();
+ if (!isClosed) {
+ current = next();
+ } else {
+ current = null;
+ }
+
+ return ret;
+ }
+
+ @Override
+ public Iterator<T> iterator() {
+ if (isClosed)
+ return null;
+
+ if (!flushBuffer)
+ return getInMemoryQueue().iterator();
+
+ SegmentQueueFileIterator iterator =
createSegmentQueueFileIterator(thisIterator);
--- End diff --
Just realized this existing implementation could be very inefficient, by
creating a new InputStream and skip a few bytes every time this is called. Is
it possible to always return the same iterator with the current read state?
Guess it will just work after using the SpoolingResultIterator logic.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---