SAMOA-58: Issue described in https://issues.apache.org/jira/browse/SAMOA-58 was apparently more complicated than what was expected in previous commit. While we did succeed in replacing the first exhausted file stream with a new one, the loader was not changed and would return null. This rework of AvroFileStream, FileStream and ArffFileStream hopefully cleans things up a bit and allows multi-file streams of either (Avro or Arff) type.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/6e81a62d Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/6e81a62d Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/6e81a62d Branch: refs/heads/master Commit: 6e81a62d776545500cf269457032a69f81f1a987 Parents: 16046cc Author: edi_bice <[email protected]> Authored: Fri Feb 19 11:55:03 2016 -0500 Committer: Gianmarco De Francisci Morales <[email protected]> Committed: Tue Apr 19 11:33:42 2016 +0300 ---------------------------------------------------------------------- .../apache/samoa/streams/ArffFileStream.java | 36 +++++++++++++------- .../apache/samoa/streams/AvroFileStream.java | 6 ++-- .../org/apache/samoa/streams/FileStream.java | 31 ++++------------- 3 files changed, 32 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/6e81a62d/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java index 9f8a322..417eb2e 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java @@ -20,7 +20,9 @@ package org.apache.samoa.streams; * #L% */ +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; import org.apache.samoa.instances.Instances; import org.apache.samoa.moa.core.InstanceExample; @@ -44,6 +46,7 @@ public class ArffFileStream extends FileStream { -1, -1, Integer.MAX_VALUE); protected InstanceExample lastInstanceRead; + private BufferedReader fileReader; @Override public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { @@ -56,32 +59,39 @@ public class ArffFileStream extends FileStream { @Override protected void reset() { try { - if (this.fileReader != null) - this.fileReader.close(); - fileSource.reset(); } catch (IOException ioe) { throw new RuntimeException("FileStream restart failed.", ioe); } - if (!getNextFileReader()) { + if (!getNextFileStream()) { hitEndOfStream = true; throw new RuntimeException("FileStream is empty."); } } @Override - protected boolean getNextFileReader() { - boolean ret = super.getNextFileReader(); - if (ret) { - this.instances = new Instances(this.fileReader, 1, -1); - if (this.classIndexOption.getValue() < 0) { - this.instances.setClassIndex(this.instances.numAttributes() - 1); - } else if (this.classIndexOption.getValue() > 0) { - this.instances.setClassIndex(this.classIndexOption.getValue() - 1); + protected boolean getNextFileStream() { + if (this.fileReader != null) + try { + this.fileReader.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); } + + this.inputStream = this.fileSource.getNextInputStream(); + if (inputStream == null) + return false; + + this.fileReader = new BufferedReader(new InputStreamReader(this.inputStream)); + this.instances = new Instances(this.fileReader, 1, -1); + if (this.classIndexOption.getValue() < 0) { + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } else if (this.classIndexOption.getValue() > 0) { + this.instances.setClassIndex(this.classIndexOption.getValue() - 1); } - return ret; + + return true; } @Override http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/6e81a62d/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java index 5b4e755..59bf22b 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java @@ -54,7 +54,7 @@ public class AvroFileStream extends FileStream { protected InstanceExample lastInstanceRead; /** Represents the binary input stream of avro data **/ - protected transient InputStream inputStream = null; + //protected transient InputStream inputStream = null; /** The extension to be considered for the files **/ private static final String AVRO_FILE_EXTENSION = "avro"; @@ -87,6 +87,7 @@ public class AvroFileStream extends FileStream { * * @return */ + @Override protected boolean getNextFileStream() { if (this.inputStream != null) try { @@ -97,8 +98,7 @@ public class AvroFileStream extends FileStream { } this.inputStream = this.fileSource.getNextInputStream(); - - if (this.inputStream == null) + if (inputStream == null) return false; this.instances = new Instances(this.inputStream, classIndexOption.getValue(), encodingFormatOption.getValue()); http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/6e81a62d/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java index 2998b22..cfa8de5 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/FileStream.java @@ -52,7 +52,8 @@ public abstract class FileStream extends AbstractOptionHandler implements Instan "LocalFileStreamSource"); protected transient FileStreamSource fileSource; - protected transient Reader fileReader; + //protected transient Reader fileReader; + protected transient InputStream inputStream; protected Instances instances; protected boolean hitEndOfStream; @@ -81,7 +82,7 @@ public abstract class FileStream extends AbstractOptionHandler implements Instan @Override public boolean hasMoreInstances() { if (this.hitEndOfStream) { - if (getNextFileReader()) { + if (getNextFileStream()) { this.hitEndOfStream = false; return hasMoreInstances(); } else { @@ -115,38 +116,18 @@ public abstract class FileStream extends AbstractOptionHandler implements Instan protected void reset() { try { - if (this.fileReader != null) - this.fileReader.close(); - fileSource.reset(); } catch (IOException ioe) { throw new RuntimeException("FileStream restart failed.", ioe); } - if (!getNextFileReader()) { + if (!getNextFileStream()) { hitEndOfStream = true; throw new RuntimeException("FileStream is empty."); } - - this.instances = new Instances(this.fileReader, 1, -1); - this.instances.setClassIndex(this.instances.numAttributes() - 1); } - protected boolean getNextFileReader() { - if (this.fileReader != null) - try { - this.fileReader.close(); - } catch (IOException ioe) { - ioe.printStackTrace(); - } - - InputStream inputStream = this.fileSource.getNextInputStream(); - if (inputStream == null) - return false; - - this.fileReader = new BufferedReader(new InputStreamReader(inputStream)); - return true; - } + protected abstract boolean getNextFileStream(); protected boolean readNextInstanceFromStream() { if (!hasStarted) { @@ -158,7 +139,7 @@ public abstract class FileStream extends AbstractOptionHandler implements Instan if (readNextInstanceFromFile()) return true; - if (!getNextFileReader()) { + if (!getNextFileStream()) { this.hitEndOfStream = true; return false; }
