SAMOA-14: Consolidate ARFFFileStream
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/dc2b7bc3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/dc2b7bc3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/dc2b7bc3 Branch: refs/heads/master Commit: dc2b7bc30f05fd2d3899827d3adb3818960084a8 Parents: 1bf3c02 Author: Gianmarco De Francisci Morales <[email protected]> Authored: Sun Mar 6 15:22:37 2016 +0300 Committer: Gianmarco De Francisci Morales <[email protected]> Committed: Sun Mar 13 11:05:50 2016 +0300 ---------------------------------------------------------------------- .../samoa/moa/streams/ArffFileStream.java | 200 ------------------- .../samoa/moa/streams/AvroFileStream.java | 167 ---------------- .../apache/samoa/streams/ArffFileStream.java | 1 - .../apache/samoa/streams/AvroFileStream.java | 166 +++++++++++++++ .../samoa/streams/fs/HDFSFileStreamSource.java | 14 +- 5 files changed, 174 insertions(+), 374 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java b/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java deleted file mode 100644 index 3a17d61..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java +++ /dev/null @@ -1,200 +0,0 @@ -package org.apache.samoa.moa.streams; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.Reader; - -import org.apache.samoa.instances.Instances; -import org.apache.samoa.instances.InstancesHeader; -import org.apache.samoa.moa.core.InputStreamProgressMonitor; -import org.apache.samoa.moa.core.InstanceExample; -import org.apache.samoa.moa.core.ObjectRepository; -import org.apache.samoa.moa.options.AbstractOptionHandler; -import org.apache.samoa.moa.tasks.TaskMonitor; - -import com.github.javacliparser.FileOption; -import com.github.javacliparser.IntOption; - -/** - * Stream reader of ARFF files. - * - * @author Richard Kirkby ([email protected]) - * @version $Revision: 7 $ - */ -public class ArffFileStream extends AbstractOptionHandler implements InstanceStream { - - @Override - public String getPurposeString() { - return "A stream read from an ARFF file."; - } - - private static final long serialVersionUID = 1L; - - public FileOption arffFileOption = new FileOption("arffFile", 'f', - "ARFF file to load.", null, "arff", false); - - public IntOption classIndexOption = new IntOption( - "classIndex", - 'c', - "Class index of data. 0 for none or -1 for last attribute in file.", - -1, -1, Integer.MAX_VALUE); - - protected Instances instances; - - transient protected Reader fileReader; - - protected boolean hitEndOfFile; - - protected InstanceExample lastInstanceRead; - - protected int numInstancesRead; - - transient protected InputStreamProgressMonitor fileProgressMonitor; - - protected boolean hasStarted; - - public ArffFileStream() { - } - - public ArffFileStream(String arffFileName, int classIndex) { - this.arffFileOption.setValue(arffFileName); - this.classIndexOption.setValue(classIndex); - this.hasStarted = false; - restart(); - } - - @Override - public void prepareForUseImpl(TaskMonitor monitor, - ObjectRepository repository) { - // restart(); - this.hasStarted = false; - this.lastInstanceRead = null; - } - - @Override - public InstancesHeader getHeader() { - return new InstancesHeader(this.instances); - } - - @Override - public long estimatedRemainingInstances() { - double progressFraction = this.fileProgressMonitor.getProgressFraction(); - if ((progressFraction > 0.0) && (this.numInstancesRead > 0)) { - return (long) ((this.numInstancesRead / progressFraction) - this.numInstancesRead); - } - return -1; - } - - @Override - public boolean hasMoreInstances() { - return !this.hitEndOfFile; - } - - @Override - public InstanceExample nextInstance() { - if (this.lastInstanceRead == null) { - readNextInstanceFromFile(); - } - InstanceExample prevInstance = this.lastInstanceRead; - this.hitEndOfFile = !readNextInstanceFromFile(); - return prevInstance; - } - - @Override - public boolean isRestartable() { - return true; - } - - @Override - public void restart() { - try { - reset(); - // this.hitEndOfFile = !readNextInstanceFromFile(); - } catch (IOException ioe) { - throw new RuntimeException("ArffFileStream restart failed.", ioe); - } - } - - protected boolean readNextInstanceFromFile() { - boolean ret; - if (!this.hasStarted) { - try { - reset(); - ret = getNextInstanceFromFile(); - this.hitEndOfFile = !ret; - } catch (IOException ioe) { - throw new RuntimeException("ArffFileStream restart failed.", ioe); - } - this.hasStarted = true; - } else { - ret = getNextInstanceFromFile(); - } - return ret; - } - - @Override - public void getDescription(StringBuilder sb, int indent) { - // TODO Auto-generated method stub - } - - private void reset() throws IOException { - if (this.fileReader != null) { - this.fileReader.close(); - } - InputStream fileStream = new FileInputStream(this.arffFileOption.getFile()); - this.fileProgressMonitor = new InputStreamProgressMonitor( - fileStream); - this.fileReader = new BufferedReader(new InputStreamReader( - this.fileProgressMonitor)); - this.instances = new Instances(this.fileReader, 1, this.classIndexOption.getValue()); - if (this.classIndexOption.getValue() < 0) { - this.instances.setClassIndex(this.instances.numAttributes() - 1); - } else if (this.classIndexOption.getValue() > 0) { - this.instances.setClassIndex(this.classIndexOption.getValue() - 1); - } - this.numInstancesRead = 0; - this.lastInstanceRead = null; - } - - private boolean getNextInstanceFromFile() throws RuntimeException { - try { - if (this.instances.readInstance(this.fileReader)) { - this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); - this.instances.delete(); // keep instances clean - this.numInstancesRead++; - return true; - } - if (this.fileReader != null) { - this.fileReader.close(); - this.fileReader = null; - } - return false; - } catch (IOException ioe) { - throw new RuntimeException( - "ArffFileStream failed to read instance from stream.", ioe); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java deleted file mode 100644 index 74b31dd..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java +++ /dev/null @@ -1,167 +0,0 @@ -package org.apache.samoa.moa.streams; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.IOException; -import java.io.InputStream; - -import org.apache.samoa.instances.Instances; -import org.apache.samoa.moa.core.InstanceExample; -import org.apache.samoa.moa.core.ObjectRepository; -import org.apache.samoa.moa.tasks.TaskMonitor; -import org.apache.samoa.streams.FileStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.javacliparser.FileOption; -import com.github.javacliparser.IntOption; -import com.github.javacliparser.StringOption; - -/** - * InstanceStream implementation to handle Apache Avro Files. Handles both JSON & Binary encoded streams - * - * - */ -public class AvroFileStream extends FileStream { - - private static final long serialVersionUID = 1L; - private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class); - - public FileOption avroFileOption = new FileOption("avroFile", 'f', "Avro File(s) to load.", null, null, false); - public IntOption classIndexOption = new IntOption("classIndex", 'c', - "Class index of data. 0 for none or -1 for last attribute in file.", -1, -1, Integer.MAX_VALUE); - public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e', - "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY"); - - /** Represents the last read Instance **/ - protected InstanceExample lastInstanceRead; - - /** Represents the binary input stream of avro data **/ - protected transient InputStream inputStream = null; - - /** The extension to be considered for the files **/ - private static final String AVRO_FILE_EXTENSION = "avro"; - - /* (non-Javadoc) - * @see org.apache.samoa.streams.FileStream#reset() - * Reset the BINARY encoded Avro Stream & Close the file source - */ - @Override - protected void reset() { - - try { - if (this.inputStream != null) - this.inputStream.close(); - - fileSource.reset(); - } catch (IOException ioException) { - logger.error(AVRO_STREAM_FAILED_RESTART_ERROR + " : {}", ioException); - throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException); - } - - if (!getNextFileStream()) { - hitEndOfStream = true; - throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR); - } - } - - /** - * Get next File Stream & set the class index read from the command line option - * - * @return - */ - protected boolean getNextFileStream() { - if (this.inputStream != null) - try { - this.inputStream.close(); - } catch (IOException ioException) { - logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException); - throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); - } - - this.inputStream = this.fileSource.getNextInputStream(); - - if (this.inputStream == null) - return false; - - this.instances = new Instances(this.inputStream, classIndexOption.getValue(), encodingFormatOption.getValue()); - - if (this.classIndexOption.getValue() < 0) { - this.instances.setClassIndex(this.instances.numAttributes() - 1); - } else if (this.classIndexOption.getValue() > 0) { - this.instances.setClassIndex(this.classIndexOption.getValue() - 1); - } - return true; - } - - /* (non-Javadoc) - * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile() - * Read next Instance from File. Return false if unable to read next Instance - */ - @Override - protected boolean readNextInstanceFromFile() { - try { - if (this.instances.readInstance()) { - this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); - this.instances.delete(); - return true; - } - if (this.inputStream != null) { - this.inputStream.close(); - this.inputStream = null; - } - return false; - } catch (IOException ioException) { - logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException); - throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); - } - - } - - @Override - public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { - super.prepareForUseImpl(monitor, repository); - String filePath = this.avroFileOption.getFile().getAbsolutePath(); - this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION); - this.lastInstanceRead = null; - } - - /* (non-Javadoc) - * @see org.apache.samoa.streams.FileStream#getLastInstanceRead() - * Return the last read Instance - */ - @Override - protected InstanceExample getLastInstanceRead() { - return this.lastInstanceRead; - } - - @Override - public void getDescription(StringBuilder sb, int indent) { - throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD); - } - - /** Error Messages to for all types of Avro File Streams */ - protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed."; - protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty."; - protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream."; - protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet."; - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java index 099f639..9f8a322 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java @@ -33,7 +33,6 @@ import com.github.javacliparser.IntOption; /** * InstanceStream for ARFF file * - * @author Casey */ public class ArffFileStream extends FileStream { http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java b/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java new file mode 100644 index 0000000..15229a4 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java @@ -0,0 +1,166 @@ +package org.apache.samoa.streams; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.samoa.instances.Instances; +import org.apache.samoa.moa.core.InstanceExample; +import org.apache.samoa.moa.core.ObjectRepository; +import org.apache.samoa.moa.tasks.TaskMonitor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.FileOption; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; + +/** + * InstanceStream implementation to handle Apache Avro Files. Handles both JSON & Binary encoded streams + * + * + */ +public class AvroFileStream extends FileStream { + + private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(AvroFileStream.class); + + public FileOption avroFileOption = new FileOption("avroFile", 'f', "Avro File(s) to load.", null, null, false); + public IntOption classIndexOption = new IntOption("classIndex", 'c', + "Class index of data. 0 for none or -1 for last attribute in file.", -1, -1, Integer.MAX_VALUE); + public StringOption encodingFormatOption = new StringOption("encodingFormatOption", 'e', + "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY"); + + /** Represents the last read Instance **/ + protected InstanceExample lastInstanceRead; + + /** Represents the binary input stream of avro data **/ + protected transient InputStream inputStream = null; + + /** The extension to be considered for the files **/ + private static final String AVRO_FILE_EXTENSION = "avro"; + + /* (non-Javadoc) + * @see org.apache.samoa.streams.FileStream#reset() + * Reset the BINARY encoded Avro Stream & Close the file source + */ + @Override + protected void reset() { + + try { + if (this.inputStream != null) + this.inputStream.close(); + + fileSource.reset(); + } catch (IOException ioException) { + logger.error(AVRO_STREAM_FAILED_RESTART_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, ioException); + } + + if (!getNextFileStream()) { + hitEndOfStream = true; + throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR); + } + } + + /** + * Get next File Stream & set the class index read from the command line option + * + * @return + */ + protected boolean getNextFileStream() { + if (this.inputStream != null) + try { + this.inputStream.close(); + } catch (IOException ioException) { + logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); + } + + this.inputStream = this.fileSource.getNextInputStream(); + + if (this.inputStream == null) + return false; + + this.instances = new Instances(this.inputStream, classIndexOption.getValue(), encodingFormatOption.getValue()); + + if (this.classIndexOption.getValue() < 0) { + this.instances.setClassIndex(this.instances.numAttributes() - 1); + } else if (this.classIndexOption.getValue() > 0) { + this.instances.setClassIndex(this.classIndexOption.getValue() - 1); + } + return true; + } + + /* (non-Javadoc) + * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile() + * Read next Instance from File. Return false if unable to read next Instance + */ + @Override + protected boolean readNextInstanceFromFile() { + try { + if (this.instances.readInstance()) { + this.lastInstanceRead = new InstanceExample(this.instances.instance(0)); + this.instances.delete(); + return true; + } + if (this.inputStream != null) { + this.inputStream.close(); + this.inputStream = null; + } + return false; + } catch (IOException ioException) { + logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException); + throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException); + } + + } + + @Override + public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository repository) { + super.prepareForUseImpl(monitor, repository); + String filePath = this.avroFileOption.getFile().getAbsolutePath(); + this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION); + this.lastInstanceRead = null; + } + + /* (non-Javadoc) + * @see org.apache.samoa.streams.FileStream#getLastInstanceRead() + * Return the last read Instance + */ + @Override + protected InstanceExample getLastInstanceRead() { + return this.lastInstanceRead; + } + + @Override + public void getDescription(StringBuilder sb, int indent) { + throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD); + } + + /** Error Messages to for all types of Avro File Streams */ + protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro FileStream restart failed."; + protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro FileStream is empty."; + protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to read next instance from Avro File Stream."; + protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method is not supported for AvroFileStream yet."; + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java b/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java index 00abd1a..67d5e79 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java @@ -40,8 +40,8 @@ import org.apache.hadoop.io.IOUtils; public class HDFSFileStreamSource implements FileStreamSource { /** - * - */ + * + */ private static final long serialVersionUID = -3887354805787167400L; private transient InputStream fileStream; @@ -59,6 +59,10 @@ public class HDFSFileStreamSource implements FileStreamSource { public void init(Configuration config, String path, String ext) { this.config = config; + config.set("fs.hdfs.impl", + org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()); + config.set("fs.file.impl", + org.apache.hadoop.fs.LocalFileSystem.class.getName()); this.filePaths = new ArrayList<String>(); Path hdfsPath = new Path(path); FileSystem fs; @@ -69,8 +73,7 @@ public class HDFSFileStreamSource implements FileStreamSource { Path filterPath = hdfsPath; if (ext != null) { filterPath = new Path(path.toString(), "*." + ext); - } - else { + } else { filterPath = new Path(path.toString(), "*"); } FileStatus[] filesInDir = fs.globStatus(filterPath); @@ -79,8 +82,7 @@ public class HDFSFileStreamSource implements FileStreamSource { filePaths.add(filesInDir[i].getPath().toString()); } } - } - else { + } else { this.filePaths.add(path); } } catch (IOException ioe) {
