SAMOA-14: Consolidate ARFFFileStream

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/dc2b7bc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/dc2b7bc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/dc2b7bc3

Branch: refs/heads/master
Commit: dc2b7bc30f05fd2d3899827d3adb3818960084a8
Parents: 1bf3c02
Author: Gianmarco De Francisci Morales <[email protected]>
Authored: Sun Mar 6 15:22:37 2016 +0300
Committer: Gianmarco De Francisci Morales <[email protected]>
Committed: Sun Mar 13 11:05:50 2016 +0300

----------------------------------------------------------------------
 .../samoa/moa/streams/ArffFileStream.java       | 200 -------------------
 .../samoa/moa/streams/AvroFileStream.java       | 167 ----------------
 .../apache/samoa/streams/ArffFileStream.java    |   1 -
 .../apache/samoa/streams/AvroFileStream.java    | 166 +++++++++++++++
 .../samoa/streams/fs/HDFSFileStreamSource.java  |  14 +-
 5 files changed, 174 insertions(+), 374 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java
deleted file mode 100644
index 3a17d61..0000000
--- a/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java
+++ /dev/null
@@ -1,200 +0,0 @@
-package org.apache.samoa.moa.streams;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.Reader;
-
-import org.apache.samoa.instances.Instances;
-import org.apache.samoa.instances.InstancesHeader;
-import org.apache.samoa.moa.core.InputStreamProgressMonitor;
-import org.apache.samoa.moa.core.InstanceExample;
-import org.apache.samoa.moa.core.ObjectRepository;
-import org.apache.samoa.moa.options.AbstractOptionHandler;
-import org.apache.samoa.moa.tasks.TaskMonitor;
-
-import com.github.javacliparser.FileOption;
-import com.github.javacliparser.IntOption;
-
-/**
- * Stream reader of ARFF files.
- * 
- * @author Richard Kirkby ([email protected])
- * @version $Revision: 7 $
- */
-public class ArffFileStream extends AbstractOptionHandler implements 
InstanceStream {
-
-  @Override
-  public String getPurposeString() {
-    return "A stream read from an ARFF file.";
-  }
-
-  private static final long serialVersionUID = 1L;
-
-  public FileOption arffFileOption = new FileOption("arffFile", 'f',
-      "ARFF file to load.", null, "arff", false);
-
-  public IntOption classIndexOption = new IntOption(
-      "classIndex",
-      'c',
-      "Class index of data. 0 for none or -1 for last attribute in file.",
-      -1, -1, Integer.MAX_VALUE);
-
-  protected Instances instances;
-
-  transient protected Reader fileReader;
-
-  protected boolean hitEndOfFile;
-
-  protected InstanceExample lastInstanceRead;
-
-  protected int numInstancesRead;
-
-  transient protected InputStreamProgressMonitor fileProgressMonitor;
-
-  protected boolean hasStarted;
-
-  public ArffFileStream() {
-  }
-
-  public ArffFileStream(String arffFileName, int classIndex) {
-    this.arffFileOption.setValue(arffFileName);
-    this.classIndexOption.setValue(classIndex);
-    this.hasStarted = false;
-    restart();
-  }
-
-  @Override
-  public void prepareForUseImpl(TaskMonitor monitor,
-      ObjectRepository repository) {
-    // restart();
-    this.hasStarted = false;
-    this.lastInstanceRead = null;
-  }
-
-  @Override
-  public InstancesHeader getHeader() {
-    return new InstancesHeader(this.instances);
-  }
-
-  @Override
-  public long estimatedRemainingInstances() {
-    double progressFraction = this.fileProgressMonitor.getProgressFraction();
-    if ((progressFraction > 0.0) && (this.numInstancesRead > 0)) {
-      return (long) ((this.numInstancesRead / progressFraction) - 
this.numInstancesRead);
-    }
-    return -1;
-  }
-
-  @Override
-  public boolean hasMoreInstances() {
-    return !this.hitEndOfFile;
-  }
-
-  @Override
-  public InstanceExample nextInstance() {
-    if (this.lastInstanceRead == null) {
-      readNextInstanceFromFile();
-    }
-    InstanceExample prevInstance = this.lastInstanceRead;
-    this.hitEndOfFile = !readNextInstanceFromFile();
-    return prevInstance;
-  }
-
-  @Override
-  public boolean isRestartable() {
-    return true;
-  }
-
-  @Override
-  public void restart() {
-    try {
-      reset();
-      // this.hitEndOfFile = !readNextInstanceFromFile();
-    } catch (IOException ioe) {
-      throw new RuntimeException("ArffFileStream restart failed.", ioe);
-    }
-  }
-
-  protected boolean readNextInstanceFromFile() {
-    boolean ret;
-    if (!this.hasStarted) {
-      try {
-        reset();
-        ret = getNextInstanceFromFile();
-        this.hitEndOfFile = !ret;
-      } catch (IOException ioe) {
-        throw new RuntimeException("ArffFileStream restart failed.", ioe);
-      }
-      this.hasStarted = true;
-    } else {
-      ret = getNextInstanceFromFile();
-    }
-    return ret;
-  }
-
-  @Override
-  public void getDescription(StringBuilder sb, int indent) {
-    // TODO Auto-generated method stub
-  }
-
-  private void reset() throws IOException {
-    if (this.fileReader != null) {
-      this.fileReader.close();
-    }
-    InputStream fileStream = new 
FileInputStream(this.arffFileOption.getFile());
-    this.fileProgressMonitor = new InputStreamProgressMonitor(
-        fileStream);
-    this.fileReader = new BufferedReader(new InputStreamReader(
-        this.fileProgressMonitor));
-    this.instances = new Instances(this.fileReader, 1, 
this.classIndexOption.getValue());
-    if (this.classIndexOption.getValue() < 0) {
-      this.instances.setClassIndex(this.instances.numAttributes() - 1);
-    } else if (this.classIndexOption.getValue() > 0) {
-      this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
-    }
-    this.numInstancesRead = 0;
-    this.lastInstanceRead = null;
-  }
-
-  private boolean getNextInstanceFromFile() throws RuntimeException {
-    try {
-      if (this.instances.readInstance(this.fileReader)) {
-        this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
-        this.instances.delete(); // keep instances clean
-        this.numInstancesRead++;
-        return true;
-      }
-      if (this.fileReader != null) {
-        this.fileReader.close();
-        this.fileReader = null;
-      }
-      return false;
-    } catch (IOException ioe) {
-      throw new RuntimeException(
-          "ArffFileStream failed to read instance from stream.", ioe);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
deleted file mode 100644
index 74b31dd..0000000
--- a/samoa-api/src/main/java/org/apache/samoa/moa/streams/AvroFileStream.java
+++ /dev/null
@@ -1,167 +0,0 @@
-package org.apache.samoa.moa.streams;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Copyright (C) 2014 - 2015 Apache Software Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.samoa.instances.Instances;
-import org.apache.samoa.moa.core.InstanceExample;
-import org.apache.samoa.moa.core.ObjectRepository;
-import org.apache.samoa.moa.tasks.TaskMonitor;
-import org.apache.samoa.streams.FileStream;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.javacliparser.FileOption;
-import com.github.javacliparser.IntOption;
-import com.github.javacliparser.StringOption;
-
-/**
- * InstanceStream implementation to handle Apache Avro Files. Handles both 
JSON & Binary encoded streams
- * 
- *
- */
-public class AvroFileStream extends FileStream {
-
-  private static final long serialVersionUID = 1L;
-  private static final Logger logger = 
LoggerFactory.getLogger(AvroFileStream.class);
-
-  public FileOption avroFileOption = new FileOption("avroFile", 'f', "Avro 
File(s) to load.", null, null, false);
-  public IntOption classIndexOption = new IntOption("classIndex", 'c',
-      "Class index of data. 0 for none or -1 for last attribute in file.", -1, 
-1, Integer.MAX_VALUE);
-  public StringOption encodingFormatOption = new 
StringOption("encodingFormatOption", 'e',
-      "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY");
-
-  /** Represents the last read Instance **/
-  protected InstanceExample lastInstanceRead;
-
-  /** Represents the binary input stream of avro data **/
-  protected transient InputStream inputStream = null;
-
-  /** The extension to be considered for the files **/
-  private static final String AVRO_FILE_EXTENSION = "avro";
-
-  /* (non-Javadoc)
-   * @see org.apache.samoa.streams.FileStream#reset()
-   * Reset the BINARY encoded Avro Stream & Close the file source
-   */
-  @Override
-  protected void reset() {
-
-    try {
-      if (this.inputStream != null)
-        this.inputStream.close();
-
-      fileSource.reset();
-    } catch (IOException ioException) {
-      logger.error(AVRO_STREAM_FAILED_RESTART_ERROR + " : {}", ioException);
-      throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, 
ioException);
-    }
-
-    if (!getNextFileStream()) {
-      hitEndOfStream = true;
-      throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR);
-    }
-  }
-
-  /**
-   * Get next File Stream & set the class index read from the command line 
option
-   * 
-   * @return
-   */
-  protected boolean getNextFileStream() {
-    if (this.inputStream != null)
-      try {
-        this.inputStream.close();
-      } catch (IOException ioException) {
-        logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException);
-        throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
-      }
-
-    this.inputStream = this.fileSource.getNextInputStream();
-
-    if (this.inputStream == null)
-      return false;
-
-    this.instances = new Instances(this.inputStream, 
classIndexOption.getValue(), encodingFormatOption.getValue());
-
-    if (this.classIndexOption.getValue() < 0) {
-      this.instances.setClassIndex(this.instances.numAttributes() - 1);
-    } else if (this.classIndexOption.getValue() > 0) {
-      this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
-    }
-    return true;
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile()
-   * Read next Instance from File. Return false if unable to read next Instance
-   */
-  @Override
-  protected boolean readNextInstanceFromFile() {
-    try {
-      if (this.instances.readInstance()) {
-        this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
-        this.instances.delete();
-        return true;
-      }
-      if (this.inputStream != null) {
-        this.inputStream.close();
-        this.inputStream = null;
-      }
-      return false;
-    } catch (IOException ioException) {
-      logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException);
-      throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
-    }
-
-  }
-
-  @Override
-  public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
-    super.prepareForUseImpl(monitor, repository);
-    String filePath = this.avroFileOption.getFile().getAbsolutePath();
-    this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION);
-    this.lastInstanceRead = null;
-  }
-
-  /* (non-Javadoc)
-   * @see org.apache.samoa.streams.FileStream#getLastInstanceRead()
-   * Return the last read Instance
-   */
-  @Override
-  protected InstanceExample getLastInstanceRead() {
-    return this.lastInstanceRead;
-  }
-
-  @Override
-  public void getDescription(StringBuilder sb, int indent) {
-    throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD);
-  }
-
-  /** Error Messages to for all types of Avro File Streams */
-  protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro 
FileStream restart failed.";
-  protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro 
FileStream is empty.";
-  protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to 
read next instance from Avro File Stream.";
-  protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method 
is not supported for AvroFileStream yet.";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
index 099f639..9f8a322 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/ArffFileStream.java
@@ -33,7 +33,6 @@ import com.github.javacliparser.IntOption;
 /**
  * InstanceStream for ARFF file
  * 
- * @author Casey
  */
 public class ArffFileStream extends FileStream {
 

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java
new file mode 100644
index 0000000..15229a4
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/AvroFileStream.java
@@ -0,0 +1,166 @@
+package org.apache.samoa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+
+/**
+ * InstanceStream implementation to handle Apache Avro Files. Handles both 
JSON & Binary encoded streams
+ * 
+ *
+ */
+public class AvroFileStream extends FileStream {
+
+  private static final long serialVersionUID = 1L;
+  private static final Logger logger = 
LoggerFactory.getLogger(AvroFileStream.class);
+
+  public FileOption avroFileOption = new FileOption("avroFile", 'f', "Avro 
File(s) to load.", null, null, false);
+  public IntOption classIndexOption = new IntOption("classIndex", 'c',
+      "Class index of data. 0 for none or -1 for last attribute in file.", -1, 
-1, Integer.MAX_VALUE);
+  public StringOption encodingFormatOption = new 
StringOption("encodingFormatOption", 'e',
+      "Encoding format for Avro Files. Can be JSON/AVRO", "BINARY");
+
+  /** Represents the last read Instance **/
+  protected InstanceExample lastInstanceRead;
+
+  /** Represents the binary input stream of avro data **/
+  protected transient InputStream inputStream = null;
+
+  /** The extension to be considered for the files **/
+  private static final String AVRO_FILE_EXTENSION = "avro";
+
+  /* (non-Javadoc)
+   * @see org.apache.samoa.streams.FileStream#reset()
+   * Reset the BINARY encoded Avro Stream & Close the file source
+   */
+  @Override
+  protected void reset() {
+
+    try {
+      if (this.inputStream != null)
+        this.inputStream.close();
+
+      fileSource.reset();
+    } catch (IOException ioException) {
+      logger.error(AVRO_STREAM_FAILED_RESTART_ERROR + " : {}", ioException);
+      throw new RuntimeException(AVRO_STREAM_FAILED_RESTART_ERROR, 
ioException);
+    }
+
+    if (!getNextFileStream()) {
+      hitEndOfStream = true;
+      throw new RuntimeException(AVRO_STREAM_EMPTY_STREAM_ERROR);
+    }
+  }
+
+  /**
+   * Get next File Stream & set the class index read from the command line 
option
+   * 
+   * @return
+   */
+  protected boolean getNextFileStream() {
+    if (this.inputStream != null)
+      try {
+        this.inputStream.close();
+      } catch (IOException ioException) {
+        logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException);
+        throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
+      }
+
+    this.inputStream = this.fileSource.getNextInputStream();
+
+    if (this.inputStream == null)
+      return false;
+
+    this.instances = new Instances(this.inputStream, 
classIndexOption.getValue(), encodingFormatOption.getValue());
+
+    if (this.classIndexOption.getValue() < 0) {
+      this.instances.setClassIndex(this.instances.numAttributes() - 1);
+    } else if (this.classIndexOption.getValue() > 0) {
+      this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
+    }
+    return true;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.samoa.streams.FileStream#readNextInstanceFromFile()
+   * Read next Instance from File. Return false if unable to read next Instance
+   */
+  @Override
+  protected boolean readNextInstanceFromFile() {
+    try {
+      if (this.instances.readInstance()) {
+        this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
+        this.instances.delete();
+        return true;
+      }
+      if (this.inputStream != null) {
+        this.inputStream.close();
+        this.inputStream = null;
+      }
+      return false;
+    } catch (IOException ioException) {
+      logger.error(AVRO_STREAM_FAILED_READ_ERROR + " : {}", ioException);
+      throw new RuntimeException(AVRO_STREAM_FAILED_READ_ERROR, ioException);
+    }
+
+  }
+
+  @Override
+  public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+    super.prepareForUseImpl(monitor, repository);
+    String filePath = this.avroFileOption.getFile().getAbsolutePath();
+    this.fileSource.init(filePath, AvroFileStream.AVRO_FILE_EXTENSION);
+    this.lastInstanceRead = null;
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.samoa.streams.FileStream#getLastInstanceRead()
+   * Return the last read Instance
+   */
+  @Override
+  protected InstanceExample getLastInstanceRead() {
+    return this.lastInstanceRead;
+  }
+
+  @Override
+  public void getDescription(StringBuilder sb, int indent) {
+    throw new UnsupportedOperationException(AVRO_STREAM_UNSUPPORTED_METHOD);
+  }
+
+  /** Error Messages to for all types of Avro File Streams */
+  protected static final String AVRO_STREAM_FAILED_RESTART_ERROR = "Avro 
FileStream restart failed.";
+  protected static final String AVRO_STREAM_EMPTY_STREAM_ERROR = "Avro 
FileStream is empty.";
+  protected static final String AVRO_STREAM_FAILED_READ_ERROR = "Failed to 
read next instance from Avro File Stream.";
+  protected static final String AVRO_STREAM_UNSUPPORTED_METHOD = "This method 
is not supported for AvroFileStream yet.";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/dc2b7bc3/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java
index 00abd1a..67d5e79 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/fs/HDFSFileStreamSource.java
@@ -40,8 +40,8 @@ import org.apache.hadoop.io.IOUtils;
 public class HDFSFileStreamSource implements FileStreamSource {
 
   /**
-        * 
-        */
+   * 
+   */
   private static final long serialVersionUID = -3887354805787167400L;
 
   private transient InputStream fileStream;
@@ -59,6 +59,10 @@ public class HDFSFileStreamSource implements 
FileStreamSource {
 
   public void init(Configuration config, String path, String ext) {
     this.config = config;
+    config.set("fs.hdfs.impl",
+        org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+    config.set("fs.file.impl",
+        org.apache.hadoop.fs.LocalFileSystem.class.getName());
     this.filePaths = new ArrayList<String>();
     Path hdfsPath = new Path(path);
     FileSystem fs;
@@ -69,8 +73,7 @@ public class HDFSFileStreamSource implements FileStreamSource 
{
         Path filterPath = hdfsPath;
         if (ext != null) {
           filterPath = new Path(path.toString(), "*." + ext);
-        }
-        else {
+        } else {
           filterPath = new Path(path.toString(), "*");
         }
         FileStatus[] filesInDir = fs.globStatus(filterPath);
@@ -79,8 +82,7 @@ public class HDFSFileStreamSource implements FileStreamSource 
{
             filePaths.add(filesInDir[i].getPath().toString());
           }
         }
-      }
-      else {
+      } else {
         this.filePaths.add(path);
       }
     } catch (IOException ioe) {

Reply via email to