Updated Branches: refs/heads/master ca2b9c06a -> 58383d3d0
CRUNCH-53: Support autoclosing iterators for text/seq/avro reader factories. Contributed by Shawn Smith. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/58383d3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/58383d3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/58383d3d Branch: refs/heads/master Commit: 58383d3d0e8346c17366beaea4c4543eec0b9c19 Parents: ca2b9c0 Author: Josh Wills <[email protected]> Authored: Tue Aug 28 16:44:42 2012 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Aug 28 16:44:42 2012 -0700 ---------------------------------------------------------------------- .../crunch/io/avro/AvroFileReaderFactory.java | 5 +- .../apache/crunch/io/impl/AutoClosingIterator.java | 62 +++++++++++++++ .../apache/crunch/io/seq/SeqFileReaderFactory.java | 5 +- .../crunch/io/seq/SeqFileTableReaderFactory.java | 5 +- .../crunch/io/text/TextFileReaderFactory.java | 7 +- 5 files changed, 75 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java index 982f6db..d1940cc 100644 --- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.MapFn; import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.io.impl.AutoClosingIterator; import org.apache.crunch.types.avro.AvroType; import org.apache.crunch.types.avro.Avros; import org.apache.hadoop.conf.Configuration; @@ -73,7 +74,7 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> { try { FsInput fsi = new FsInput(path, fs.getConf()); final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader); - return new UnmodifiableIterator<T>() { + return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() { @Override public boolean hasNext() { return reader.hasNext(); @@ -83,7 +84,7 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> { public T next() { return mapFn.map(reader.next()); } - }; + }); } catch (IOException e) { LOG.info("Could not read avro file at path: " + path, e); return Iterators.emptyIterator(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java new file mode 100644 index 0000000..d58f290 --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.io.impl; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Iterator; + +import com.google.common.collect.UnmodifiableIterator; +import com.google.common.io.Closeables; + +/** + * Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false. As long a client loops through to + * completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically. + */ +public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Iterator<T>, Closeable { + private final Iterator<T> iter; + private Closeable closeable; + + public AutoClosingIterator(Closeable closeable, Iterator<T> iter) { + this.closeable = closeable; + this.iter = iter; + } + + @Override + public boolean hasNext() { + if (!iter.hasNext()) { + Closeables.closeQuietly(this); + return false; + } else { + return true; + } + } + + @Override + public T next() { + return iter.next(); + } + + @Override + public void close() throws IOException { + if (closeable != null) { + closeable.close(); + closeable = null; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java index 47163e1..050c0fc 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.MapFn; import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.io.impl.AutoClosingIterator; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -57,7 +58,7 @@ public class SeqFileReaderFactory<T> implements FileReaderFactory<T> { mapFn.initialize(); try { final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); - return new UnmodifiableIterator<T>() { + return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() { boolean nextChecked = false; boolean hasNext = false; @@ -84,7 +85,7 @@ public class SeqFileReaderFactory<T> implements FileReaderFactory<T> { nextChecked = false; return mapFn.map(value); } - }; + }); } catch (IOException e) { LOG.info("Could not read seqfile at path: " + path, e); return Iterators.emptyIterator(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java index 038142a..7c34a75 100644 --- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.crunch.MapFn; import org.apache.crunch.Pair; import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.io.impl.AutoClosingIterator; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; @@ -64,7 +65,7 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K valueMapFn.initialize(); try { final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf); - return new UnmodifiableIterator<Pair<K, V>>() { + return new AutoClosingIterator<Pair<K, V>>(reader, new UnmodifiableIterator<Pair<K, V>>() { boolean nextChecked = false; boolean hasNext = false; @@ -91,7 +92,7 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K nextChecked = false; return Pair.of(keyMapFn.map(key), valueMapFn.map(value)); } - }; + }); } catch (IOException e) { LOG.info("Could not read seqfile at path: " + path, e); return Iterators.emptyIterator(); http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java index d22b233..5a512fc 100644 --- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java +++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java @@ -28,6 +28,7 @@ import org.apache.crunch.MapFn; import org.apache.crunch.fn.CompositeMapFn; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.io.FileReaderFactory; +import org.apache.crunch.io.impl.AutoClosingIterator; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -65,7 +66,7 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> { mapFn.setConfigurationForTest(conf); mapFn.initialize(); - FSDataInputStream is = null; + FSDataInputStream is; try { is = fs.open(path); } catch (IOException e) { @@ -75,7 +76,7 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> { final BufferedReader reader = new BufferedReader(new InputStreamReader(is)); final MapFn<String, T> iterMapFn = mapFn; - return new UnmodifiableIterator<T>() { + return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() { private String nextLine; @Override @@ -92,6 +93,6 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> { public T next() { return iterMapFn.map(nextLine); } - }; + }); } }
