Author: szetszwo
Date: Tue Dec 1 23:26:16 2009
New Revision: 886004
URL: http://svn.apache.org/viewvc?rev=886004&view=rev
Log:
HADOOP-6307. Add a new SequenceFile.Reader constructor in order to support
reading on un-closed file.
Modified:
hadoop/common/branches/branch-0.21/CHANGES.txt
hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java
hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java
Modified: hadoop/common/branches/branch-0.21/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.21/CHANGES.txt?rev=886004&r1=886003&r2=886004&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.21/CHANGES.txt Tue Dec 1 23:26:16 2009
@@ -636,6 +636,9 @@
HADOOP-6271. Add recursive and non recursive create and mkdir to
FileContext. (Sanjay Radia via suresh)
+ HADOOP-6307. Add a new SequenceFile.Reader constructor in order to support
+ reading on un-closed file. (szetszwo)
+
BUG FIXES
HADOOP-5379. CBZip2InputStream to throw IOException on data crc error.
Modified:
hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java?rev=886004&r1=886003&r2=886004&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java
(original)
+++
hadoop/common/branches/branch-0.21/src/java/org/apache/hadoop/io/SequenceFile.java
Tue Dec 1 23:26:16 2009
@@ -1435,32 +1435,71 @@
private DeserializerBase keyDeserializer;
private DeserializerBase valDeserializer;
- /** Open the named file. */
+ /**
+ * Construct a reader by opening a file from the given file system.
+ * @param fs The file system used to open the file.
+ * @param file The file being read.
+ * @param conf Configuration
+ * @throws IOException
+ */
public Reader(FileSystem fs, Path file, Configuration conf)
throws IOException {
this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
}
+ /**
+ * Construct a reader by the given input stream.
+ * @param in An input stream.
+ * @param buffersize The buffer size used to read the file.
+ * @param start The starting position.
+ * @param length The length being read.
+ * @param conf Configuration
+ * @throws IOException
+ */
+ public Reader(FSDataInputStream in, int buffersize,
+ long start, long length, Configuration conf) throws IOException {
+ this(null, null, in, buffersize, start, length, conf, false);
+ }
+
private Reader(FileSystem fs, Path file, int bufferSize,
Configuration conf, boolean tempReader) throws IOException {
- this(fs, file, bufferSize, 0, fs.getFileStatus(file).getLen(), conf,
tempReader);
+ this(fs, file, null, bufferSize, 0, fs.getFileStatus(file).getLen(),
+ conf, tempReader);
}
-
- private Reader(FileSystem fs, Path file, int bufferSize, long start,
- long length, Configuration conf, boolean tempReader)
- throws IOException {
+
+ /**
+ * Private constructor.
+ * @param fs The file system used to open the file.
+ * It is not used if the given input stream is not null.
+ * @param file The file being read.
+ * @param in An input stream of the file. If it is null,
+ * the file will be opened from the given file system.
+ * @param bufferSize The buffer size used to read the file.
+ * @param start The starting position.
+ * @param length The length being read.
+ * @param conf Configuration
+ * @param tempReader Is this temporary?
+ * @throws IOException
+ */
+ private Reader(FileSystem fs, Path file, FSDataInputStream in,
+ int bufferSize, long start, long length, Configuration conf,
+ boolean tempReader) throws IOException {
+ if (fs == null && in == null) {
+ throw new IllegalArgumentException("fs == null && in == null");
+ }
+
this.file = file;
- this.in = openFile(fs, file, bufferSize, length);
+ this.in = in != null? in: openFile(fs, file, bufferSize, length);
this.conf = conf;
boolean succeeded = false;
try {
seek(start);
- this.end = in.getPos() + length;
+ this.end = this.in.getPos() + length;
init(tempReader);
succeeded = true;
} finally {
if (!succeeded) {
- IOUtils.cleanup(LOG, in);
+ IOUtils.cleanup(LOG, this.in);
}
}
}
@@ -1468,6 +1507,13 @@
/**
* Override this method to specialize the type of
* {...@link FSDataInputStream} returned.
+ * @param fs The file system used to open the file.
+ * @param file The file being read.
+ * @param bufferSize The buffer size used to read the file.
+ * @param length The length being read if it is >= 0. Otherwise,
+ * the length is not available.
+ * @return The opened stream.
+ * @throws IOException
*/
protected FSDataInputStream openFile(FileSystem fs, Path file,
int bufferSize, long length) throws IOException {
@@ -1489,7 +1535,7 @@
if ((versionBlock[0] != VERSION[0]) ||
(versionBlock[1] != VERSION[1]) ||
(versionBlock[2] != VERSION[2]))
- throw new IOException(file + " not a SequenceFile");
+ throw new IOException(this + " not a SequenceFile");
// Set 'version'
version = versionBlock[3];
@@ -2251,7 +2297,7 @@
/** Returns the name of the file. */
public String toString() {
- return file.toString();
+ return file == null? "<unknown>": file.toString();
}
}
@@ -3132,7 +3178,7 @@
if (fs.getUri().getScheme().startsWith("ramfs")) {
bufferSize = conf.getInt("io.bytes.per.checksum", 512);
}
- Reader reader = new Reader(fs, segmentPathName,
+ Reader reader = new Reader(fs, segmentPathName, null,
bufferSize, segmentOffset,
segmentLength, conf, false);
Modified:
hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java?rev=886004&r1=886003&r2=886004&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java
(original)
+++
hadoop/common/branches/branch-0.21/src/test/core/org/apache/hadoop/io/TestSequenceFileSync.java
Tue Dec 1 23:26:16 2009
@@ -18,22 +18,17 @@
package org.apache.hadoop.io;
-import java.io.File;
+import static org.junit.Assert.assertEquals;
+
import java.io.IOException;
import java.util.Random;
-import java.net.URI;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.junit.Before;
import org.junit.Test;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import static org.junit.Assert.*;
-
public class TestSequenceFileSync {
private static final int NUMRECORDS = 2000;
private static final int RECORDSIZE = 80;
@@ -66,8 +61,18 @@
try {
writeSequenceFile(writer, NUMRECORDS);
for (int i = 0; i < 5 ; i++) {
- final SequenceFile.Reader reader =
- new SequenceFile.Reader(fs, path, conf);
+ final SequenceFile.Reader reader;
+
+ //try different SequenceFile.Reader constructors
+ if (i % 2 == 0) {
+ reader = new SequenceFile.Reader(fs, path, conf);
+ } else {
+ final FSDataInputStream in = fs.open(path);
+ final long length = fs.getFileStatus(path).getLen();
+ final int buffersize = conf.getInt("io.file.buffer.size", 4096);
+ reader = new SequenceFile.Reader(in, buffersize, 0L, length, conf);
+ }
+
try {
forOffset(reader, input, val, i, 0, 0);
forOffset(reader, input, val, i, 65, 0);