Updated Branches: refs/heads/trunk 32fef9342 -> 70fe963ec
FLUME-1828: ResettableInputStream should support seek() (Mike Percy via Brock Noland) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/70fe963e Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/70fe963e Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/70fe963e Branch: refs/heads/trunk Commit: 70fe963ecab62bebab76b9f97fbeb312b1176f9f Parents: 32fef93 Author: Brock Noland <[email protected]> Authored: Mon Jan 14 11:48:31 2013 -0800 Committer: Brock Noland <[email protected]> Committed: Mon Jan 14 11:48:31 2013 -0800 ---------------------------------------------------------------------- .../serialization/ResettableFileInputStream.java | 34 ++++-- .../flume/serialization/ResettableInputStream.java | 36 +++++-- .../org/apache/flume/serialization/Seekable.java | 25 +++++ .../ResettableTestStringInputStream.java | 12 ++- .../TestResettableFileInputStream.java | 82 ++++++++++++++- 5 files changed, 168 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java index f9e4ec9..49521ab 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java @@ -44,7 +44,7 @@ import java.nio.charset.CoderResult; */ @InterfaceAudience.Private @InterfaceStability.Evolving -public class ResettableFileInputStream implements ResettableInputStream { +public class ResettableFileInputStream extends ResettableInputStream { public static final int DEFAULT_BUF_SIZE = 16384; @@ -183,6 +183,7 @@ public class ResettableFileInputStream implements ResettableInputStream { private void refillBuf() throws IOException { buf.compact(); + chan.position(position); // ensure we read from the proper offset chan.read(buf); buf.flip(); } @@ -197,27 +198,40 @@ public class ResettableFileInputStream implements ResettableInputStream { seek(tracker.getPosition()); } - private long tell() throws IOException { + @Override + public long tell() throws IOException { return syncPosition; } - private synchronized void seek(long position) throws IOException { - // perform underlying file seek - chan.position(position); + @Override + public synchronized void seek(long newPos) throws IOException { - // invalidate cache - buf.clear(); - buf.flip(); + // check to see if we can seek within our existing buffer + long relativeChange = newPos - position; + if (relativeChange == 0) return; // seek to current pos => no-op + + long newBufPos = buf.position() + relativeChange; + if (newBufPos >= 0 && newBufPos < buf.limit()) { + // we can reuse the read buffer + buf.position((int)newBufPos); + } else { + // otherwise, we have to invalidate the read buffer + buf.clear(); + buf.flip(); + } // clear decoder state decoder.reset(); + // perform underlying file seek + chan.position(newPos); + // reset position pointers - this.position = this.syncPosition = position; + position = syncPosition = newPos; } private void incrPosition(int incr, boolean updateSyncPosition) { - this.position += incr; + position += incr; if (updateSyncPosition) { syncPosition = position; } http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java index ae989a6..ddebc30 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java @@ -24,16 +24,20 @@ import java.io.Closeable; import java.io.IOException; /** - * <p> This abstract class defines an interface on top of InputStream for which + * <p> This abstract class defines an interface for which * the stream may be <code>mark</code>ed and <code>reset</code> with no limit to * the number of bytes which may have been read between the calls. * * <p> Any implementation of this interface guarantees that the mark position * will not be invalidated by reading any number of bytes. + * + * <p> Warning: We reserve the right to add public methods to this class in + * the future. Third-party subclasses beware. */ @InterfaceAudience.Public [email protected] -public interface ResettableInputStream extends Resettable, Closeable { [email protected] +public abstract class ResettableInputStream implements Resettable, Seekable, + Closeable { /** * Read a single byte of data from the stream. @@ -41,7 +45,7 @@ public interface ResettableInputStream extends Resettable, Closeable { * been reached. * @throws IOException */ - public int read() throws IOException; + public abstract int read() throws IOException; /** * Read multiple bytes of data from the stream. @@ -52,7 +56,7 @@ public interface ResettableInputStream extends Resettable, Closeable { * the end of the stream has been reached. * @throws IOException */ - public int read(byte[] b, int off, int len) throws IOException; + public abstract int read(byte[] b, int off, int len) throws IOException; /** * <p>Read a single character. @@ -66,7 +70,7 @@ public interface ResettableInputStream extends Resettable, Closeable { * (0x00-0xffff), or -1 if the end of the stream has been reached * @throws IOException */ - public int readChar() throws IOException; + public abstract int readChar() throws IOException; /** * Marks the current position in this input stream. A subsequent call to the @@ -81,16 +85,30 @@ public interface ResettableInputStream extends Resettable, Closeable { * @see java.io.InputStream#reset() */ @Override - public void mark() throws IOException; + public abstract void mark() throws IOException; /** * Reset stream position to that set by {@link #mark()} * @throws IOException */ @Override - public void reset() throws IOException; + public abstract void reset() throws IOException; + + /** + * Seek to the specified byte position in the stream. + * @param position Absolute byte offset to seek to + */ + @Override + public abstract void seek(long position) throws IOException; + + /** + * Tell the current byte position. + * @return the current absolute byte position in the stream + */ + @Override + public abstract long tell() throws IOException; @Override - public void close() throws IOException; + public abstract void close() throws IOException; } http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java b/flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java new file mode 100644 index 0000000..513f29a --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flume.serialization; + +import java.io.IOException; + +public interface Seekable { + void seek(long position) throws IOException; + long tell() throws IOException; +} http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java index ef8b7ba..03c8dda 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java @@ -19,7 +19,7 @@ package org.apache.flume.serialization; import java.io.IOException; -public class ResettableTestStringInputStream implements ResettableInputStream { +public class ResettableTestStringInputStream extends ResettableInputStream { private String str; int markPos = 0; @@ -52,6 +52,16 @@ public class ResettableTestStringInputStream implements ResettableInputStream { } @Override + public void seek(long position) throws IOException { + throw new UnsupportedOperationException("Unimplemented in test class"); + } + + @Override + public long tell() throws IOException { + throw new UnsupportedOperationException("Unimplemented in test class"); + } + + @Override public int read() throws IOException { throw new UnsupportedOperationException("This test class doesn't return " + "bytes!"); http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java index 73e2baa..5ad6a0a 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java +++ b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java @@ -30,8 +30,11 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.*; import static org.junit.Assert.assertEquals; +import java.io.BufferedOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.nio.charset.Charset; import java.util.List; @@ -102,7 +105,7 @@ public class TestResettableFileInputStream { assertEquals(output, result2); String result3 = readLine(in, output.length()); - assertNull(result3); + assertNull("Should be null: " + result3, result3); in.close(); } @@ -173,6 +176,59 @@ public class TestResettableFileInputStream { Assert.assertEquals(result3, result3a); } + @Test + public void testSeek() throws IOException { + int NUM_LINES = 1000; + int LINE_LEN = 1000; + generateData(file, Charsets.UTF_8, NUM_LINES, LINE_LEN); + + PositionTracker tracker = new DurablePositionTracker(meta, file.getPath()); + ResettableInputStream in = new ResettableFileInputStream(file, tracker, + 10 * LINE_LEN, Charsets.UTF_8); + + String line = ""; + for (int i = 0; i < 9; i++) { + line = readLine(in, LINE_LEN); + } + int lineNum = Integer.parseInt(line.substring(0, 10)); + assertEquals(8, lineNum); + + // seek back within our buffer + long pos = in.tell(); + in.seek(pos - 2 * LINE_LEN); // jump back 2 lines + + line = readLine(in, LINE_LEN); + lineNum = Integer.parseInt(line.substring(0, 10)); + assertEquals(7, lineNum); + + // seek forward within our buffer + in.seek(in.tell() + LINE_LEN); + line = readLine(in, LINE_LEN); + lineNum = Integer.parseInt(line.substring(0, 10)); + assertEquals(9, lineNum); + + // seek forward outside our buffer + in.seek(in.tell() + 20 * LINE_LEN); + line = readLine(in, LINE_LEN); + lineNum = Integer.parseInt(line.substring(0, 10)); + assertEquals(30, lineNum); + + // seek backward outside our buffer + in.seek(in.tell() - 25 * LINE_LEN); + line = readLine(in, LINE_LEN); + lineNum = Integer.parseInt(line.substring(0, 10)); + assertEquals(6, lineNum); + + // test a corner-case seek which requires a buffer refill + in.seek(100 * LINE_LEN); + in.seek(0); // reset buffer + + in.seek(9 * LINE_LEN); + assertEquals(9, Integer.parseInt(readLine(in, LINE_LEN).substring(0, 10))); + assertEquals(10, Integer.parseInt(readLine(in, LINE_LEN).substring(0, 10))); + assertEquals(11, Integer.parseInt(readLine(in, LINE_LEN).substring(0, 10))); + } + /** * Helper function to read a line from a character stream. * @param in @@ -229,4 +285,28 @@ public class TestResettableFileInputStream { return lines; } + private static void generateData(File file, Charset charset, + int numLines, int lineLen) throws IOException { + + OutputStream out = new BufferedOutputStream(new FileOutputStream(file)); + StringBuilder junk = new StringBuilder(); + for (int x = 0; x < lineLen - 13; x++) { + junk.append('x'); + } + String payload = junk.toString(); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < numLines; i++) { + builder.append(String.format("%010d: %s\n", i, payload)); + if (i % 1000 == 0 && i != 0) { + out.write(builder.toString().getBytes(charset)); + builder.setLength(0); + } + } + + out.write(builder.toString().getBytes(charset)); + out.close(); + + Assert.assertEquals(lineLen * numLines, file.length()); + } + }
