Updated Branches:
  refs/heads/trunk 32fef9342 -> 70fe963ec

FLUME-1828: ResettableInputStream should support seek()

(Mike Percy via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/70fe963e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/70fe963e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/70fe963e

Branch: refs/heads/trunk
Commit: 70fe963ecab62bebab76b9f97fbeb312b1176f9f
Parents: 32fef93
Author: Brock Noland <[email protected]>
Authored: Mon Jan 14 11:48:31 2013 -0800
Committer: Brock Noland <[email protected]>
Committed: Mon Jan 14 11:48:31 2013 -0800

----------------------------------------------------------------------
 .../serialization/ResettableFileInputStream.java   |   34 ++++--
 .../flume/serialization/ResettableInputStream.java |   36 +++++--
 .../org/apache/flume/serialization/Seekable.java   |   25 +++++
 .../ResettableTestStringInputStream.java           |   12 ++-
 .../TestResettableFileInputStream.java             |   82 ++++++++++++++-
 5 files changed, 168 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
 
b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
index f9e4ec9..49521ab 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableFileInputStream.java
@@ -44,7 +44,7 @@ import java.nio.charset.CoderResult;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class ResettableFileInputStream implements ResettableInputStream {
+public class ResettableFileInputStream extends ResettableInputStream {
 
   public static final int DEFAULT_BUF_SIZE = 16384;
 
@@ -183,6 +183,7 @@ public class ResettableFileInputStream implements 
ResettableInputStream {
 
   private void refillBuf() throws IOException {
     buf.compact();
+    chan.position(position); // ensure we read from the proper offset
     chan.read(buf);
     buf.flip();
   }
@@ -197,27 +198,40 @@ public class ResettableFileInputStream implements 
ResettableInputStream {
     seek(tracker.getPosition());
   }
 
-  private long tell() throws IOException {
+  @Override
+  public long tell() throws IOException {
     return syncPosition;
   }
 
-  private synchronized void seek(long position) throws IOException {
-    // perform underlying file seek
-    chan.position(position);
+  @Override
+  public synchronized void seek(long newPos) throws IOException {
 
-    // invalidate cache
-    buf.clear();
-    buf.flip();
+    // check to see if we can seek within our existing buffer
+    long relativeChange = newPos - position;
+    if (relativeChange == 0) return; // seek to current pos => no-op
+
+    long newBufPos = buf.position() + relativeChange;
+    if (newBufPos >= 0 && newBufPos < buf.limit()) {
+      // we can reuse the read buffer
+      buf.position((int)newBufPos);
+    } else {
+      // otherwise, we have to invalidate the read buffer
+      buf.clear();
+      buf.flip();
+    }
 
     // clear decoder state
     decoder.reset();
 
+    // perform underlying file seek
+    chan.position(newPos);
+
     // reset position pointers
-    this.position = this.syncPosition = position;
+    position = syncPosition = newPos;
   }
 
   private void incrPosition(int incr, boolean updateSyncPosition) {
-    this.position += incr;
+    position += incr;
     if (updateSyncPosition) {
       syncPosition = position;
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java
 
b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java
index ae989a6..ddebc30 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/serialization/ResettableInputStream.java
@@ -24,16 +24,20 @@ import java.io.Closeable;
 import java.io.IOException;
 
 /**
- * <p> This abstract class defines an interface on top of InputStream for which
+ * <p> This abstract class defines an interface for which
  * the stream may be <code>mark</code>ed and <code>reset</code> with no limit 
to
  * the number of bytes which may have been read between the calls.
  *
  * <p> Any implementation of this interface guarantees that the mark position
  * will not be invalidated by reading any number of bytes.
+ *
+ * <p> Warning: We reserve the right to add public methods to this class in
+ * the future. Third-party subclasses beware.
  */
 @InterfaceAudience.Public
[email protected]
-public interface ResettableInputStream extends Resettable, Closeable {
[email protected]
+public abstract class ResettableInputStream implements Resettable, Seekable,
+    Closeable {
 
   /**
    * Read a single byte of data from the stream.
@@ -41,7 +45,7 @@ public interface ResettableInputStream extends Resettable, 
Closeable {
    * been reached.
    * @throws IOException
    */
-  public int read() throws IOException;
+  public abstract int read() throws IOException;
 
   /**
    * Read multiple bytes of data from the stream.
@@ -52,7 +56,7 @@ public interface ResettableInputStream extends Resettable, 
Closeable {
    * the end of the stream has been reached.
    * @throws IOException
    */
-  public int read(byte[] b, int off, int len) throws IOException;
+  public abstract int read(byte[] b, int off, int len) throws IOException;
 
   /**
    * <p>Read a single character.
@@ -66,7 +70,7 @@ public interface ResettableInputStream extends Resettable, 
Closeable {
    *         (0x00-0xffff), or -1 if the end of the stream has been reached
    * @throws IOException
    */
-  public int readChar() throws IOException;
+  public abstract int readChar() throws IOException;
 
   /**
    * Marks the current position in this input stream. A subsequent call to the
@@ -81,16 +85,30 @@ public interface ResettableInputStream extends Resettable, 
Closeable {
    * @see java.io.InputStream#reset()
    */
   @Override
-  public void mark() throws IOException;
+  public abstract void mark() throws IOException;
 
   /**
    * Reset stream position to that set by {@link #mark()}
    * @throws IOException
    */
   @Override
-  public void reset() throws IOException;
+  public abstract void reset() throws IOException;
+
+  /**
+   * Seek to the specified byte position in the stream.
+   * @param position Absolute byte offset to seek to
+   */
+  @Override
+  public abstract void seek(long position) throws IOException;
+
+  /**
+   * Tell the current byte position.
+   * @return the current absolute byte position in the stream
+   */
+  @Override
+  public abstract long tell() throws IOException;
 
   @Override
-  public void close() throws IOException;
+  public abstract void close() throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java 
b/flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java
new file mode 100644
index 0000000..513f29a
--- /dev/null
+++ b/flume-ng-core/src/main/java/org/apache/flume/serialization/Seekable.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.serialization;
+
+import java.io.IOException;
+
+public interface Seekable {
+  void seek(long position) throws IOException;
+  long tell() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java
index ef8b7ba..03c8dda 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/ResettableTestStringInputStream.java
@@ -19,7 +19,7 @@ package org.apache.flume.serialization;
 
 import java.io.IOException;
 
-public class ResettableTestStringInputStream implements ResettableInputStream {
+public class ResettableTestStringInputStream extends ResettableInputStream {
 
   private String str;
   int markPos = 0;
@@ -52,6 +52,16 @@ public class ResettableTestStringInputStream implements 
ResettableInputStream {
   }
 
   @Override
+  public void seek(long position) throws IOException {
+    throw new UnsupportedOperationException("Unimplemented in test class");
+  }
+
+  @Override
+  public long tell() throws IOException {
+    throw new UnsupportedOperationException("Unimplemented in test class");
+  }
+
+  @Override
   public int read() throws IOException {
     throw new UnsupportedOperationException("This test class doesn't return " +
         "bytes!");

http://git-wip-us.apache.org/repos/asf/flume/blob/70fe963e/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
index 73e2baa..5ad6a0a 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
@@ -30,8 +30,11 @@ import org.slf4j.LoggerFactory;
 import static org.junit.Assert.*;
 import static org.junit.Assert.assertEquals;
 
+import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.charset.Charset;
 import java.util.List;
 
@@ -102,7 +105,7 @@ public class TestResettableFileInputStream {
     assertEquals(output, result2);
 
     String result3 = readLine(in, output.length());
-    assertNull(result3);
+    assertNull("Should be null: " + result3, result3);
 
     in.close();
   }
@@ -173,6 +176,59 @@ public class TestResettableFileInputStream {
     Assert.assertEquals(result3, result3a);
   }
 
+  @Test
+  public void testSeek() throws IOException {
+    int NUM_LINES = 1000;
+    int LINE_LEN = 1000;
+    generateData(file, Charsets.UTF_8, NUM_LINES, LINE_LEN);
+
+    PositionTracker tracker = new DurablePositionTracker(meta, file.getPath());
+    ResettableInputStream in = new ResettableFileInputStream(file, tracker,
+        10 * LINE_LEN, Charsets.UTF_8);
+
+    String line = "";
+    for (int i = 0; i < 9; i++) {
+      line = readLine(in, LINE_LEN);
+    }
+    int lineNum = Integer.parseInt(line.substring(0, 10));
+    assertEquals(8, lineNum);
+
+    // seek back within our buffer
+    long pos = in.tell();
+    in.seek(pos - 2 * LINE_LEN); // jump back 2 lines
+
+    line = readLine(in, LINE_LEN);
+    lineNum = Integer.parseInt(line.substring(0, 10));
+    assertEquals(7, lineNum);
+
+    // seek forward within our buffer
+    in.seek(in.tell() + LINE_LEN);
+    line = readLine(in, LINE_LEN);
+    lineNum = Integer.parseInt(line.substring(0, 10));
+    assertEquals(9, lineNum);
+
+    // seek forward outside our buffer
+    in.seek(in.tell() + 20 * LINE_LEN);
+    line = readLine(in, LINE_LEN);
+    lineNum = Integer.parseInt(line.substring(0, 10));
+    assertEquals(30, lineNum);
+
+    // seek backward outside our buffer
+    in.seek(in.tell() - 25 * LINE_LEN);
+    line = readLine(in, LINE_LEN);
+    lineNum = Integer.parseInt(line.substring(0, 10));
+    assertEquals(6, lineNum);
+
+    // test a corner-case seek which requires a buffer refill
+    in.seek(100 * LINE_LEN);
+    in.seek(0); // reset buffer
+
+    in.seek(9 * LINE_LEN);
+    assertEquals(9, Integer.parseInt(readLine(in, LINE_LEN).substring(0, 10)));
+    assertEquals(10, Integer.parseInt(readLine(in, LINE_LEN).substring(0, 
10)));
+    assertEquals(11, Integer.parseInt(readLine(in, LINE_LEN).substring(0, 
10)));
+  }
+
   /**
    * Helper function to read a line from a character stream.
    * @param in
@@ -229,4 +285,28 @@ public class TestResettableFileInputStream {
     return lines;
   }
 
+  private static void generateData(File file, Charset charset,
+      int numLines, int lineLen) throws IOException {
+
+    OutputStream out = new BufferedOutputStream(new FileOutputStream(file));
+    StringBuilder junk = new StringBuilder();
+    for (int x = 0; x < lineLen - 13; x++) {
+      junk.append('x');
+    }
+    String payload = junk.toString();
+    StringBuilder builder = new StringBuilder();
+    for (int i = 0; i < numLines; i++) {
+      builder.append(String.format("%010d: %s\n", i, payload));
+      if (i % 1000 == 0 && i != 0) {
+        out.write(builder.toString().getBytes(charset));
+        builder.setLength(0);
+      }
+    }
+
+    out.write(builder.toString().getBytes(charset));
+    out.close();
+
+    Assert.assertEquals(lineLen * numLines, file.length());
+  }
+
 }

Reply via email to