http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/options/AbstractClassOption.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/options/AbstractClassOption.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/options/AbstractClassOption.java
new file mode 100644
index 0000000..3230235
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/options/AbstractClassOption.java
@@ -0,0 +1,254 @@
+package org.apache.samoa.moa.options;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.tasks.Task;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import com.github.javacliparser.AbstractOption;
+import com.github.javacliparser.SerializeUtils;
+
+/**
+ * Abstract class option.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision$
+ */
+public abstract class AbstractClassOption extends AbstractOption {
+
+  private static final long serialVersionUID = 1L;
+
+  /** The prefix text to use to indicate file. */
+  public static final String FILE_PREFIX_STRING = "file:";
+
+  /** The prefix text to use to indicate inmem. */
+  public static final String INMEM_PREFIX_STRING = "inmem:";
+
+  /** The current object */
+  protected Object currentValue;
+
+  /** The class type */
+  protected Class<?> requiredType;
+
+  /** The default command line interface text. */
+  protected String defaultCLIString;
+
+  /** The null text. */
+  protected String nullString;
+
+  /**
+   * Creates a new instance of an abstract option given its class name, 
command line interface text, its purpose, its
+   * class type and its default command line interface text.
+   * 
+   * @param name
+   *          the name of this option
+   * @param cliChar
+   *          the command line interface text
+   * @param purpose
+   *          the text describing the purpose of this option
+   * @param requiredType
+   *          the class type
+   * @param defaultCLIString
+   *          the default command line interface text
+   */
+  public AbstractClassOption(String name, char cliChar, String purpose,
+      Class<?> requiredType, String defaultCLIString) {
+    this(name, cliChar, purpose, requiredType, defaultCLIString, null);
+  }
+
+  /**
+   * Creates a new instance of an abstract option given its class name, 
command line interface text, its purpose, its
+   * class type, default command line interface text, and its null text.
+   * 
+   * @param name
+   *          the name of this option
+   * @param cliChar
+   *          the command line interface text
+   * @param purpose
+   *          the text describing the purpose of this option
+   * @param requiredType
+   *          the class type
+   * @param defaultCLIString
+   *          the default command line interface text
+   * @param nullString
+   *          the null text
+   */
+  public AbstractClassOption(String name, char cliChar, String purpose,
+      Class<?> requiredType, String defaultCLIString, String nullString) {
+    super(name, cliChar, purpose);
+    this.requiredType = requiredType;
+    this.defaultCLIString = defaultCLIString;
+    this.nullString = nullString;
+    resetToDefault();
+  }
+
+  /**
+   * Sets current object.
+   * 
+   * @param obj
+   *          the object to set as current.
+   */
+  public void setCurrentObject(Object obj) {
+    if (((obj == null) && (this.nullString != null))
+        || this.requiredType.isInstance(obj)
+        || (obj instanceof String)
+        || (obj instanceof File)
+        || ((obj instanceof Task) && 
this.requiredType.isAssignableFrom(((Task) obj).getTaskResultType()))) {
+      this.currentValue = obj;
+    } else {
+      throw new IllegalArgumentException("Object not of required type.");
+    }
+  }
+
+  /**
+   * Returns the current object.
+   * 
+   * @return the current object
+   */
+  public Object getPreMaterializedObject() {
+    return this.currentValue;
+  }
+
+  /**
+   * Gets the class type of this option.
+   * 
+   * @return the class type of this option
+   */
+  public Class<?> getRequiredType() {
+    return this.requiredType;
+  }
+
+  /**
+   * Gets the null string of this option.
+   * 
+   * @return the null string of this option
+   */
+  public String getNullString() {
+    return this.nullString;
+  }
+
+  /**
+   * Gets a materialized object of this option.
+   * 
+   * @param monitor
+   *          the task monitor to use
+   * @param repository
+   *          the object repository to use
+   * @return the materialized object
+   */
+  public Object materializeObject(TaskMonitor monitor,
+      ObjectRepository repository) {
+    if ((this.currentValue == null)
+        || this.requiredType.isInstance(this.currentValue)) {
+      return this.currentValue;
+    } else if (this.currentValue instanceof String) {
+      if (repository != null) {
+        Object inmemObj = repository.getObjectNamed((String) 
this.currentValue);
+        if (inmemObj == null) {
+          throw new RuntimeException("No object named "
+              + this.currentValue + " found in repository.");
+        }
+        return inmemObj;
+      }
+      throw new RuntimeException("No object repository available.");
+    } else if (this.currentValue instanceof Task) {
+      Task task = (Task) this.currentValue;
+      Object result = task.doTask(monitor, repository);
+      return result;
+    } else if (this.currentValue instanceof File) {
+      File inputFile = (File) this.currentValue;
+      Object result = null;
+      try {
+        result = SerializeUtils.readFromFile(inputFile);
+      } catch (Exception ex) {
+        throw new RuntimeException("Problem loading "
+            + this.requiredType.getName() + " object from file '"
+            + inputFile.getName() + "':\n" + ex.getMessage(), ex);
+      }
+      return result;
+    } else {
+      throw new RuntimeException(
+          "Could not materialize object of required type "
+              + this.requiredType.getName() + ", found "
+              + this.currentValue.getClass().getName()
+              + " instead.");
+    }
+  }
+
+  @Override
+  public String getDefaultCLIString() {
+    return this.defaultCLIString;
+  }
+
+  /**
+   * Gets the command line interface text of the class.
+   * 
+   * @param aClass
+   *          the class
+   * @param requiredType
+   *          the class type
+   * @return the command line interface text of the class
+   */
+  public static String classToCLIString(Class<?> aClass, Class<?> 
requiredType) {
+    String className = aClass.getName();
+    String packageName = requiredType.getPackage().getName();
+    if (className.startsWith(packageName)) {
+      // cut off package name
+      className = className.substring(packageName.length() + 1, 
className.length());
+    } else if (Task.class.isAssignableFrom(aClass)) {
+      packageName = Task.class.getPackage().getName();
+      if (className.startsWith(packageName)) {
+        // cut off task package name
+        className = className.substring(packageName.length() + 1,
+            className.length());
+      }
+    }
+    return className;
+  }
+
+  @Override
+  public abstract String getValueAsCLIString();
+
+  @Override
+  public abstract void setValueViaCLIString(String s);
+
+  // @Override
+  // public abstract JComponent getEditComponent();
+
+  /**
+   * Gets the class name without its package name prefix.
+   * 
+   * @param className
+   *          the name of the class
+   * @param expectedType
+   *          the type of the class
+   * @return the class name without its package name prefix
+   */
+  public static String stripPackagePrefix(String className, Class<?> 
expectedType) {
+    if (className.startsWith(expectedType.getPackage().getName())) {
+      return className.substring(expectedType.getPackage().getName().length() 
+ 1);
+    }
+    return className;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/options/AbstractOptionHandler.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/options/AbstractOptionHandler.java
 
b/samoa-api/src/main/java/org/apache/samoa/moa/options/AbstractOptionHandler.java
new file mode 100644
index 0000000..6acad43
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/options/AbstractOptionHandler.java
@@ -0,0 +1,167 @@
+package org.apache.samoa.moa.options;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.moa.AbstractMOAObject;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.tasks.NullMonitor;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import com.github.javacliparser.Options;
+
+/**
+ * Abstract Option Handler. All classes that have options in MOA extend this 
class.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public abstract class AbstractOptionHandler extends AbstractMOAObject 
implements
+    OptionHandler {
+
+  private static final long serialVersionUID = 1L;
+
+  /** Options to handle */
+  // protected Options options;
+
+  /** Dictionary with option texts and objects */
+  // protected Map<String, Object> classOptionNamesToPreparedObjects;
+
+  @Override
+  public String getPurposeString() {
+    return "Anonymous object: purpose undocumented.";
+  }
+
+  @Override
+  public Options getOptions() {
+    /*
+     * if (this.options == null) { this.options = new Options(); if
+     * (this.config== null) { this.config = new OptionsHandler(this, "");
+     * this.config.prepareForUse(); } Option[] myOptions =
+     * this.config.discoverOptionsViaReflection(); for (Option option :
+     * myOptions) { this.options.addOption(option); } } return this.options;
+     */
+    if (this.config == null) {
+      this.config = new OptionsHandler(this, "");
+      // this.config.prepareForUse(monitor, repository);
+    }
+    return this.config.getOptions();
+  }
+
+  @Override
+  public void prepareForUse() {
+    prepareForUse(new NullMonitor(), null);
+  }
+
+  protected OptionsHandler config;
+
+  @Override
+  public void prepareForUse(TaskMonitor monitor, ObjectRepository repository) {
+    // prepareClassOptions(monitor, repository);
+    if (this.config == null) {
+      this.config = new OptionsHandler(this, "");
+      this.config.prepareForUse(monitor, repository);
+    }
+    prepareForUseImpl(monitor, repository);
+  }
+
+  /**
+   * This method describes the implementation of how to prepare this object 
for use. All classes that extends this class
+   * have to implement <code>prepareForUseImpl</code> and not 
<code>prepareForUse</code> since
+   * <code>prepareForUse</code> calls <code>prepareForUseImpl</code>.
+   * 
+   * @param monitor
+   *          the TaskMonitor to use
+   * @param repository
+   *          the ObjectRepository to use
+   */
+  protected abstract void prepareForUseImpl(TaskMonitor monitor,
+      ObjectRepository repository);
+
+  @Override
+  public String getCLICreationString(Class<?> expectedType) {
+    return ClassOption.stripPackagePrefix(this.getClass().getName(),
+        expectedType)
+        + " " + getOptions().getAsCLIString();
+  }
+
+  @Override
+  public OptionHandler copy() {
+    return (OptionHandler) super.copy();
+  }
+
+  /**
+   * Gets the options of this class via reflection.
+   * 
+   * @return an array of options
+   */
+  /*
+   * protected Option[] discoverOptionsViaReflection() { Class<? extends
+   * AbstractOptionHandler> c = this.getClass(); Field[] fields = 
c.getFields();
+   * List<Option> optList = new LinkedList<Option>(); for (Field field : 
fields)
+   * { String fName = field.getName(); Class<?> fType = field.getType(); if
+   * (fType.getName().endsWith("Option")) { if
+   * (Option.class.isAssignableFrom(fType)) { Option oVal = null; try {
+   * field.setAccessible(true); oVal = (Option) field.get(this); } catch
+   * (IllegalAccessException ignored) { // cannot access this field } if (oVal
+   * != null) { optList.add(oVal); } } } } return optList.toArray(new
+   * Option[optList.size()]); }
+   */
+
+  /**
+   * Prepares the options of this class.
+   * 
+   * @param monitor
+   *          the TaskMonitor to use
+   * @param repository
+   *          the ObjectRepository to use
+   */
+  protected void prepareClassOptions(TaskMonitor monitor,
+      ObjectRepository repository) {
+    this.config.prepareClassOptions(monitor, repository);
+  }/*
+    * this.classOptionNamesToPreparedObjects = null; Option[] optionArray =
+    * getOptions().getOptionArray(); for (Option option : optionArray) { if
+    * (option instanceof ClassOption) { ClassOption classOption = (ClassOption)
+    * option; monitor.setCurrentActivity("Materializing option " +
+    * classOption.getName() + "...", -1.0); Object optionObj =
+    * classOption.materializeObject(monitor, repository); if
+    * (monitor.taskShouldAbort()) { return; } if (optionObj instanceof
+    * OptionHandler) { monitor.setCurrentActivity("Preparing option " +
+    * classOption.getName() + "...", -1.0); ((OptionHandler)
+    * optionObj).prepareForUse(monitor, repository); if
+    * (monitor.taskShouldAbort()) { return; } } if
+    * (this.classOptionNamesToPreparedObjects == null) {
+    * this.classOptionNamesToPreparedObjects = new HashMap<String, Object>(); }
+    * this.classOptionNamesToPreparedObjects.put(option.getName(), optionObj); 
}
+    * } }
+    */
+
+  /**
+   * Gets a prepared option of this class.
+   * 
+   * @param opt
+   *          the class option to get
+   * @return an option stored in the dictionary
+   */
+  protected Object getPreparedClassOption(ClassOption opt) {
+    return this.config.getPreparedClassOption(opt);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/options/ClassOption.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/options/ClassOption.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/options/ClassOption.java
new file mode 100644
index 0000000..1d9fdfe
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/options/ClassOption.java
@@ -0,0 +1,178 @@
+package org.apache.samoa.moa.options;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.File;
+
+import org.apache.samoa.moa.options.OptionHandler;
+import org.apache.samoa.moa.tasks.Task;
+
+import com.github.javacliparser.Option;
+import com.github.javacliparser.Options;
+
+/**
+ * Class option.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public class ClassOption extends AbstractClassOption {
+
+  private static final long serialVersionUID = 1L;
+
+  public ClassOption(String name, char cliChar, String purpose,
+      Class<?> requiredType, String defaultCLIString) {
+    super(name, cliChar, purpose, requiredType, defaultCLIString);
+  }
+
+  public ClassOption(String name, char cliChar, String purpose,
+      Class<?> requiredType, String defaultCLIString, String nullString) {
+    super(name, cliChar, purpose, requiredType, defaultCLIString, nullString);
+  }
+
+  @Override
+  public String getValueAsCLIString() {
+    if ((this.currentValue == null) && (this.nullString != null)) {
+      return this.nullString;
+    }
+    return objectToCLIString(this.currentValue, this.requiredType);
+  }
+
+  @Override
+  public void setValueViaCLIString(String s) {
+    if ((this.nullString != null)
+        && ((s == null) || (s.length() == 0) || s.equals(this.nullString))) {
+      this.currentValue = null;
+    } else {
+      try {
+        this.currentValue = cliStringToObject(s, this.requiredType,
+            null);
+      } catch (Exception e) {
+        throw new IllegalArgumentException("Problems with option: " + 
getName(), e);
+      }
+    }
+  }
+
+  public static String objectToCLIString(Object obj, Class<?> requiredType) {
+    if (obj == null) {
+      return "";
+    }
+    if (obj instanceof File) {
+      return (FILE_PREFIX_STRING + ((File) obj).getPath());
+    }
+    if (obj instanceof String) {
+      return (INMEM_PREFIX_STRING + obj);
+    }
+    String className = classToCLIString(obj.getClass(), requiredType);
+    if (obj instanceof OptionHandler) {
+      String subOptions = ((OptionHandler) obj).getOptions().getAsCLIString();
+      if (subOptions.length() > 0) {
+        return (className + " " + subOptions);
+      }
+    }
+    return className;
+  }
+
+  public static Object cliStringToObject(String cliString,
+      Class<?> requiredType, Option[] externalOptions) throws Exception {
+    if (cliString.startsWith(FILE_PREFIX_STRING)) {
+      return new File(cliString.substring(FILE_PREFIX_STRING.length()));
+    }
+    if (cliString.startsWith(INMEM_PREFIX_STRING)) {
+      return cliString.substring(INMEM_PREFIX_STRING.length());
+    }
+    cliString = cliString.trim();
+    int firstSpaceIndex = cliString.indexOf(' ', 0);
+    String className;
+    String classOptions;
+    if (firstSpaceIndex > 0) {
+      className = cliString.substring(0, firstSpaceIndex);
+      classOptions = cliString.substring(firstSpaceIndex + 1, 
cliString.length());
+      classOptions = classOptions.trim();
+    } else {
+      className = cliString;
+      classOptions = "";
+    }
+    Class<?> classObject;
+    try {
+      classObject = Class.forName(className);
+    } catch (Throwable t1) {
+      try {
+        // try prepending default package
+        classObject = Class.forName(requiredType.getPackage().getName()
+            + "." + className);
+      } catch (Throwable t2) {
+        try {
+          // try prepending task package
+          classObject = Class.forName(Task.class.getPackage().getName()
+              + "." + className);
+        } catch (Throwable t3) {
+          throw new Exception("Class not found: " + className);
+        }
+      }
+    }
+    Object classInstance;
+    try {
+      classInstance = classObject.newInstance();
+    } catch (Exception ex) {
+      throw new Exception("Problem creating instance of class: "
+          + className, ex);
+    }
+    if (requiredType.isInstance(classInstance)
+        || ((classInstance instanceof Task) && requiredType
+            .isAssignableFrom(((Task) classInstance).getTaskResultType()))) {
+      Options options = new Options();
+      if (externalOptions != null) {
+        for (Option option : externalOptions) {
+          options.addOption(option);
+        }
+      }
+      if (classInstance instanceof OptionHandler) {
+        Option[] objectOptions = ((OptionHandler) 
classInstance).getOptions().getOptionArray();
+        for (Option option : objectOptions) {
+          options.addOption(option);
+        }
+      }
+      try {
+        options.setViaCLIString(classOptions);
+      } catch (Exception ex) {
+        throw new Exception("Problem with options to '"
+            + className
+            + "'."
+            + "\n\nValid options for "
+            + className
+            + ":\n"
+            + ((OptionHandler) classInstance).getOptions().getHelpString(), 
ex);
+      } finally {
+        options.removeAllOptions(); // clean up listener refs
+      }
+    } else {
+      throw new Exception("Class named '" + className
+          + "' is not an instance of " + requiredType.getName() + ".");
+    }
+    return classInstance;
+  }
+
+  // @Override
+  // public JComponent getEditComponent() {
+  // return new ClassOptionEditComponent(this);
+  // }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/options/OptionHandler.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/options/OptionHandler.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/options/OptionHandler.java
new file mode 100644
index 0000000..0af7172
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/options/OptionHandler.java
@@ -0,0 +1,82 @@
+package org.apache.samoa.moa.options;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.moa.MOAObject;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.Options;
+
+/**
+ * Interface representing an object that handles options or parameters.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public interface OptionHandler extends MOAObject, Configurable {
+
+  /**
+   * Gets the purpose of this object
+   * 
+   * @return the string with the purpose of this object
+   */
+  public String getPurposeString();
+
+  /**
+   * Gets the options of this object
+   * 
+   * @return the options of this object
+   */
+  public Options getOptions();
+
+  /**
+   * This method prepares this object for use.
+   * 
+   */
+  public void prepareForUse();
+
+  /**
+   * This method prepares this object for use.
+   * 
+   * @param monitor
+   *          the TaskMonitor to use
+   * @param repository
+   *          the ObjectRepository to use
+   */
+  public void prepareForUse(TaskMonitor monitor, ObjectRepository repository);
+
+  /**
+   * This method produces a copy of this object.
+   * 
+   * @return a copy of this object
+   */
+  @Override
+  public OptionHandler copy();
+
+  /**
+   * Gets the Command Line Interface text to create the object
+   * 
+   * @return the Command Line Interface text to create the object
+   */
+  public String getCLICreationString(Class<?> expectedType);
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/options/OptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/options/OptionsHandler.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/options/OptionsHandler.java
new file mode 100644
index 0000000..cbe047f
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/options/OptionsHandler.java
@@ -0,0 +1,184 @@
+package org.apache.samoa.moa.options;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.HashMap;
+
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.tasks.NullMonitor;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import com.github.javacliparser.JavaCLIParser;
+import com.github.javacliparser.Option;
+
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+
+/**
+ * 
+ * @author abifet
+ */
+public class OptionsHandler extends JavaCLIParser {
+
+  // public Object handler;
+
+  public OptionsHandler(Object c, String cliString) {
+    super(c, cliString);
+    // this.handler = c;
+    // this.prepareForUse();
+    /*
+     * int firstSpaceIndex = cliString.indexOf(' ', 0); String classOptions;
+     * String className; if (firstSpaceIndex > 0) { className =
+     * cliString.substring(0, firstSpaceIndex); classOptions =
+     * cliString.substring(firstSpaceIndex + 1, cliString.length());
+     * classOptions = classOptions.trim(); } else { className = cliString;
+     * classOptions = ""; }
+     */
+    // options.setViaCLIString(cliString);
+  }
+
+  // private static final long serialVersionUID = 1L;
+
+  /** Options to handle */
+  // protected Options options;
+
+  /** Dictionary with option texts and objects */
+  // protected Map<String, Object> classOptionNamesToPreparedObjects;
+
+  /*
+   * public String getPurposeString() { return
+   * "Anonymous object: purpose undocumented."; }
+   * 
+   * public Options getOptions() { if (this.options == null) { this.options =
+   * new Options(); Option[] myOptions = discoverOptionsViaReflection(); for
+   * (Option option : myOptions) { this.options.addOption(option); } } return
+   * this.options; }
+   */
+
+  public void prepareForUse() {
+    prepareForUse(new NullMonitor(), null);
+  }
+
+  public void prepareForUse(TaskMonitor monitor, ObjectRepository repository) {
+    prepareClassOptions(monitor, repository);
+    // prepareForUseImpl(monitor, repository);
+  }
+
+  /**
+   * This method describes the implementation of how to prepare this object 
for use. All classes that extends this class
+   * have to implement <code>prepareForUseImpl</code> and not 
<code>prepareForUse</code> since
+   * <code>prepareForUse</code> calls <code>prepareForUseImpl</code>.
+   * 
+   * @param monitor
+   *          the TaskMonitor to use
+   * @param repository
+   *          the ObjectRepository to use
+   */
+  // protected abstract void prepareForUseImpl(TaskMonitor monitor,
+  // ObjectRepository repository);
+
+  /*
+   * public String getCLICreationString(Class<?> expectedType) { return
+   * ClassOption.stripPackagePrefix(this.getClass().getName(), expectedType) +
+   * " " + getOptions().getAsCLIString(); }
+   */
+
+  /**
+   * Gets the options of this class via reflection.
+   * 
+   * @return an array of options
+   */
+  /*
+   * public Option[] discoverOptionsViaReflection() { //Class<? extends
+   * AbstractOptionHandler> c = this.getClass(); Class c =
+   * this.handler.getClass(); Field[] fields = c.getFields(); List<Option>
+   * optList = new LinkedList<Option>(); for (Field field : fields) { String
+   * fName = field.getName(); Class<?> fType = field.getType(); if
+   * (fType.getName().endsWith("Option")) { if
+   * (Option.class.isAssignableFrom(fType)) { Option oVal = null; try {
+   * field.setAccessible(true); oVal = (Option) field.get(this.handler); } 
catch
+   * (IllegalAccessException ignored) { // cannot access this field } if (oVal
+   * != null) { optList.add(oVal); } } } } return optList.toArray(new
+   * Option[optList.size()]); }
+   */
+
+  /**
+   * Prepares the options of this class.
+   * 
+   * @param monitor
+   *          the TaskMonitor to use
+   * @param repository
+   *          the ObjectRepository to use
+   */
+  public void prepareClassOptions(TaskMonitor monitor,
+      ObjectRepository repository) {
+    this.classOptionNamesToPreparedObjects = null;
+    Option[] optionArray = getOptions().getOptionArray();
+    for (Option option : optionArray) {
+      if (option instanceof ClassOption) {
+        ClassOption classOption = (ClassOption) option;
+        monitor.setCurrentActivity("Materializing option "
+            + classOption.getName() + "...", -1.0);
+        Object optionObj = classOption.materializeObject(monitor,
+            repository);
+        if (monitor.taskShouldAbort()) {
+          return;
+        }
+        if (optionObj instanceof OptionHandler) {
+          monitor.setCurrentActivity("Preparing option "
+              + classOption.getName() + "...", -1.0);
+          ((OptionHandler) optionObj).prepareForUse(monitor,
+              repository);
+          if (monitor.taskShouldAbort()) {
+            return;
+          }
+        }
+        if (this.classOptionNamesToPreparedObjects == null) {
+          this.classOptionNamesToPreparedObjects = new HashMap<String, 
Object>();
+        }
+        this.classOptionNamesToPreparedObjects.put(option.getName(),
+            optionObj);
+      }
+    }
+  }
+
+  /**
+   * Gets a prepared option of this class.
+   * 
+   * @param opt
+   *          the class option to get
+   * @return an option stored in the dictionary
+   */
+  public Object getPreparedClassOption(ClassOption opt) {
+    if (this.classOptionNamesToPreparedObjects == null) {
+      this.prepareForUse();
+    }
+    return this.classOptionNamesToPreparedObjects.get(opt.getName());
+  }
+
+  // @Override
+  // public void getDescription(StringBuilder sb, int i) {
+  // throw new UnsupportedOperationException("Not supported yet.");
+  // }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java
new file mode 100644
index 0000000..3a17d61
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/streams/ArffFileStream.java
@@ -0,0 +1,200 @@
+package org.apache.samoa.moa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.BufferedReader;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.moa.core.InputStreamProgressMonitor;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.options.AbstractOptionHandler;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import com.github.javacliparser.FileOption;
+import com.github.javacliparser.IntOption;
+
+/**
+ * Stream reader of ARFF files.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public class ArffFileStream extends AbstractOptionHandler implements 
InstanceStream {
+
+  @Override
+  public String getPurposeString() {
+    return "A stream read from an ARFF file.";
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  public FileOption arffFileOption = new FileOption("arffFile", 'f',
+      "ARFF file to load.", null, "arff", false);
+
+  public IntOption classIndexOption = new IntOption(
+      "classIndex",
+      'c',
+      "Class index of data. 0 for none or -1 for last attribute in file.",
+      -1, -1, Integer.MAX_VALUE);
+
+  protected Instances instances;
+
+  transient protected Reader fileReader;
+
+  protected boolean hitEndOfFile;
+
+  protected InstanceExample lastInstanceRead;
+
+  protected int numInstancesRead;
+
+  transient protected InputStreamProgressMonitor fileProgressMonitor;
+
+  protected boolean hasStarted;
+
+  public ArffFileStream() {
+  }
+
+  public ArffFileStream(String arffFileName, int classIndex) {
+    this.arffFileOption.setValue(arffFileName);
+    this.classIndexOption.setValue(classIndex);
+    this.hasStarted = false;
+    restart();
+  }
+
+  @Override
+  public void prepareForUseImpl(TaskMonitor monitor,
+      ObjectRepository repository) {
+    // restart();
+    this.hasStarted = false;
+    this.lastInstanceRead = null;
+  }
+
+  @Override
+  public InstancesHeader getHeader() {
+    return new InstancesHeader(this.instances);
+  }
+
+  @Override
+  public long estimatedRemainingInstances() {
+    double progressFraction = this.fileProgressMonitor.getProgressFraction();
+    if ((progressFraction > 0.0) && (this.numInstancesRead > 0)) {
+      return (long) ((this.numInstancesRead / progressFraction) - 
this.numInstancesRead);
+    }
+    return -1;
+  }
+
+  @Override
+  public boolean hasMoreInstances() {
+    return !this.hitEndOfFile;
+  }
+
+  @Override
+  public InstanceExample nextInstance() {
+    if (this.lastInstanceRead == null) {
+      readNextInstanceFromFile();
+    }
+    InstanceExample prevInstance = this.lastInstanceRead;
+    this.hitEndOfFile = !readNextInstanceFromFile();
+    return prevInstance;
+  }
+
+  @Override
+  public boolean isRestartable() {
+    return true;
+  }
+
+  @Override
+  public void restart() {
+    try {
+      reset();
+      // this.hitEndOfFile = !readNextInstanceFromFile();
+    } catch (IOException ioe) {
+      throw new RuntimeException("ArffFileStream restart failed.", ioe);
+    }
+  }
+
+  protected boolean readNextInstanceFromFile() {
+    boolean ret;
+    if (!this.hasStarted) {
+      try {
+        reset();
+        ret = getNextInstanceFromFile();
+        this.hitEndOfFile = !ret;
+      } catch (IOException ioe) {
+        throw new RuntimeException("ArffFileStream restart failed.", ioe);
+      }
+      this.hasStarted = true;
+    } else {
+      ret = getNextInstanceFromFile();
+    }
+    return ret;
+  }
+
+  @Override
+  public void getDescription(StringBuilder sb, int indent) {
+    // TODO Auto-generated method stub
+  }
+
+  private void reset() throws IOException {
+    if (this.fileReader != null) {
+      this.fileReader.close();
+    }
+    InputStream fileStream = new 
FileInputStream(this.arffFileOption.getFile());
+    this.fileProgressMonitor = new InputStreamProgressMonitor(
+        fileStream);
+    this.fileReader = new BufferedReader(new InputStreamReader(
+        this.fileProgressMonitor));
+    this.instances = new Instances(this.fileReader, 1, 
this.classIndexOption.getValue());
+    if (this.classIndexOption.getValue() < 0) {
+      this.instances.setClassIndex(this.instances.numAttributes() - 1);
+    } else if (this.classIndexOption.getValue() > 0) {
+      this.instances.setClassIndex(this.classIndexOption.getValue() - 1);
+    }
+    this.numInstancesRead = 0;
+    this.lastInstanceRead = null;
+  }
+
+  private boolean getNextInstanceFromFile() throws RuntimeException {
+    try {
+      if (this.instances.readInstance(this.fileReader)) {
+        this.lastInstanceRead = new 
InstanceExample(this.instances.instance(0));
+        this.instances.delete(); // keep instances clean
+        this.numInstancesRead++;
+        return true;
+      }
+      if (this.fileReader != null) {
+        this.fileReader.close();
+        this.fileReader = null;
+      }
+      return false;
+    } catch (IOException ioe) {
+      throw new RuntimeException(
+          "ArffFileStream failed to read instance from stream.", ioe);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/ExampleStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/ExampleStream.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/ExampleStream.java
new file mode 100644
index 0000000..c4ab2df
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/streams/ExampleStream.java
@@ -0,0 +1,76 @@
+package org.apache.samoa.moa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.moa.MOAObject;
+import org.apache.samoa.moa.core.Example;
+
+/**
+ * Interface representing a data stream of examples.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public interface ExampleStream<E extends Example> extends MOAObject {
+
+  /**
+   * Gets the header of this stream. This is useful to know attributes and 
classes. InstancesHeader is an extension of
+   * weka.Instances.
+   * 
+   * @return the header of this stream
+   */
+  public InstancesHeader getHeader();
+
+  /**
+   * Gets the estimated number of remaining instances in this stream
+   * 
+   * @return the estimated number of instances to get from this stream
+   */
+  public long estimatedRemainingInstances();
+
+  /**
+   * Gets whether this stream has more instances to output. This is useful 
when reading streams from files.
+   * 
+   * @return true if this stream has more instances to output
+   */
+  public boolean hasMoreInstances();
+
+  /**
+   * Gets the next example from this stream.
+   * 
+   * @return the next example of this stream
+   */
+  public E nextInstance();
+
+  /**
+   * Gets whether this stream can restart.
+   * 
+   * @return true if this stream can restart
+   */
+  public boolean isRestartable();
+
+  /**
+   * Restarts this stream. It must be similar to starting a new stream from 
scratch.
+   * 
+   */
+  public void restart();
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/InstanceStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/InstanceStream.java 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/InstanceStream.java
new file mode 100644
index 0000000..fc6ed8e
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/moa/streams/InstanceStream.java
@@ -0,0 +1,34 @@
+package org.apache.samoa.moa.streams;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.moa.core.Example;
+
+/**
+ * Interface representing a data stream of instances.
+ * 
+ * @author Richard Kirkby ([email protected])
+ * @version $Revision: 7 $
+ */
+public interface InstanceStream extends ExampleStream<Example<Instance>> {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusterEvent.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusterEvent.java
 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusterEvent.java
new file mode 100644
index 0000000..9aa1168
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusterEvent.java
@@ -0,0 +1,49 @@
+package org.apache.samoa.moa.streams.clustering;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.EventObject;
+
+public class ClusterEvent extends EventObject {
+
+  private String type;
+  private String message;
+  private long timestamp;
+
+  public ClusterEvent(Object source, long timestamp, String type, String 
message) {
+    super(source);
+    this.type = type;
+    this.message = message;
+    this.timestamp = timestamp;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public String getType() {
+    return type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusterEventListener.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusterEventListener.java
 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusterEventListener.java
new file mode 100644
index 0000000..af7881b
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusterEventListener.java
@@ -0,0 +1,29 @@
+package org.apache.samoa.moa.streams.clustering;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.EventListener;
+
+public interface ClusterEventListener extends EventListener {
+
+  public void changeCluster(ClusterEvent e);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusteringStream.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusteringStream.java
 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusteringStream.java
new file mode 100644
index 0000000..e51b437
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/ClusteringStream.java
@@ -0,0 +1,54 @@
+package org.apache.samoa.moa.streams.clustering;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import org.apache.samoa.moa.options.AbstractOptionHandler;
+import org.apache.samoa.moa.streams.InstanceStream;
+
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+
+public abstract class ClusteringStream extends AbstractOptionHandler 
implements InstanceStream {
+  public IntOption decayHorizonOption = new IntOption("decayHorizon", 'h',
+      "Decay horizon", 1000, 0, Integer.MAX_VALUE);
+
+  public FloatOption decayThresholdOption = new FloatOption("decayThreshold", 
't',
+      "Decay horizon threshold", 0.01, 0, 1);
+
+  public IntOption evaluationFrequencyOption = new 
IntOption("evaluationFrequency", 'e',
+      "Evaluation frequency", 1000, 0, Integer.MAX_VALUE);
+
+  public IntOption numAttsOption = new IntOption("numAtts", 'a',
+      "The number of attributes to generate.", 2, 0, Integer.MAX_VALUE);
+
+  public int getDecayHorizon() {
+    return decayHorizonOption.getValue();
+  }
+
+  public double getDecayThreshold() {
+    return decayThresholdOption.getValue();
+  }
+
+  public int getEvaluationFrequency() {
+    return evaluationFrequencyOption.getValue();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java
 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java
new file mode 100644
index 0000000..c83f688
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/clustering/RandomRBFGeneratorEvents.java
@@ -0,0 +1,968 @@
+package org.apache.samoa.moa.streams.clustering;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.LinkedList;
+import java.util.Random;
+import java.util.Vector;
+
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.moa.cluster.Clustering;
+import org.apache.samoa.moa.cluster.SphereCluster;
+import org.apache.samoa.moa.core.AutoExpandVector;
+import org.apache.samoa.moa.core.DataPoint;
+import org.apache.samoa.moa.core.FastVector;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import com.github.javacliparser.FlagOption;
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+
+public class RandomRBFGeneratorEvents extends ClusteringStream {
+  private transient Vector listeners;
+
+  private static final long serialVersionUID = 1L;
+
+  public IntOption modelRandomSeedOption = new IntOption("modelRandomSeed",
+      'm', "Seed for random generation of model.", 1);
+
+  public IntOption instanceRandomSeedOption = new IntOption(
+      "instanceRandomSeed", 'i',
+      "Seed for random generation of instances.", 5);
+
+  public IntOption numClusterOption = new IntOption("numCluster", 'K',
+      "The average number of centroids in the model.", 5, 1, 
Integer.MAX_VALUE);
+
+  public IntOption numClusterRangeOption = new IntOption("numClusterRange", 
'k',
+      "Deviation of the number of centroids in the model.", 3, 0, 
Integer.MAX_VALUE);
+
+  public FloatOption kernelRadiiOption = new FloatOption("kernelRadius", 'R',
+      "The average radii of the centroids in the model.", 0.07, 0, 1);
+
+  public FloatOption kernelRadiiRangeOption = new 
FloatOption("kernelRadiusRange", 'r',
+      "Deviation of average radii of the centroids in the model.", 0, 0, 1);
+
+  public FloatOption densityRangeOption = new FloatOption("densityRange", 'd',
+      "Offset of the average weight a cluster has. Value of 0 means all 
cluster " +
+          "contain the same amount of points.", 0, 0, 1);
+
+  public IntOption speedOption = new IntOption("speed", 'V',
+      "Kernels move a predefined distance of 0.01 every X points", 500, 1, 
Integer.MAX_VALUE);
+
+  public IntOption speedRangeOption = new IntOption("speedRange", 'v',
+      "Speed/Velocity point offset", 0, 0, Integer.MAX_VALUE);
+
+  public FloatOption noiseLevelOption = new FloatOption("noiseLevel", 'N',
+      "Noise level", 0.1, 0, 1);
+
+  public FlagOption noiseInClusterOption = new FlagOption("noiseInCluster", 
'n',
+      "Allow noise to be placed within a cluster");
+
+  public IntOption eventFrequencyOption = new IntOption("eventFrequency", 'E',
+      "Event frequency. Enable at least one of the events below and set 
numClusterRange!", 30000, 0, Integer.MAX_VALUE);
+
+  public FlagOption eventMergeSplitOption = new 
FlagOption("eventMergeSplitOption", 'M',
+      "Enable merging and splitting of clusters. Set eventFrequency and 
numClusterRange!");
+
+  public FlagOption eventDeleteCreateOption = new 
FlagOption("eventDeleteCreate", 'C',
+      "Enable emering and disapperaing of clusters. Set eventFrequency and 
numClusterRange!");
+
+  private double merge_threshold = 0.7;
+  private int kernelMovePointFrequency = 10;
+  private double maxDistanceMoveThresholdByStep = 0.01;
+  private int maxOverlapFitRuns = 50;
+  private double eventFrequencyRange = 0;
+
+  private boolean debug = false;
+
+  private AutoExpandVector<GeneratorCluster> kernels;
+  protected Random instanceRandom;
+  protected InstancesHeader streamHeader;
+  private int numGeneratedInstances;
+  private int numActiveKernels;
+  private int nextEventCounter;
+  private int nextEventChoice = -1;
+  private int clusterIdCounter;
+  private GeneratorCluster mergeClusterA;
+  private GeneratorCluster mergeClusterB;
+  private boolean mergeKernelsOverlapping = false;
+
+  private class GeneratorCluster implements Serializable {
+    // TODO: points is redundant to microclusterpoints, we need to come
+    // up with a good strategy that microclusters get updated and
+    // rebuild if needed. Idea: Sort microclusterpoints by timestamp and let
+    // microclusterdecay hold the timestamp for when the last point in a
+    // microcluster gets kicked, then we rebuild... or maybe not... could be
+    // same as searching for point to be kicked. more likely is we rebuild
+    // fewer times then insert.
+
+    private static final long serialVersionUID = -6301649898961112942L;
+
+    SphereCluster generator;
+    int kill = -1;
+    boolean merging = false;
+    double[] moveVector;
+    int totalMovementSteps;
+    int currentMovementSteps;
+    boolean isSplitting = false;
+
+    LinkedList<DataPoint> points = new LinkedList<DataPoint>();
+    ArrayList<SphereCluster> microClusters = new ArrayList<SphereCluster>();
+    ArrayList<ArrayList<DataPoint>> microClustersPoints = new ArrayList();
+    ArrayList<Integer> microClustersDecay = new ArrayList();
+
+    public GeneratorCluster(int label) {
+      boolean outofbounds = true;
+      int tryCounter = 0;
+      while (outofbounds && tryCounter < maxOverlapFitRuns) {
+        tryCounter++;
+        outofbounds = false;
+        double[] center = new double[numAttsOption.getValue()];
+        double radius = kernelRadiiOption.getValue() + 
(instanceRandom.nextBoolean() ? -1 : 1)
+            * kernelRadiiRangeOption.getValue() * instanceRandom.nextDouble();
+        while (radius <= 0) {
+          radius = kernelRadiiOption.getValue() + 
(instanceRandom.nextBoolean() ? -1 : 1)
+              * kernelRadiiRangeOption.getValue() * 
instanceRandom.nextDouble();
+        }
+        for (int j = 0; j < numAttsOption.getValue(); j++) {
+          center[j] = instanceRandom.nextDouble();
+          if (center[j] - radius < 0 || center[j] + radius > 1) {
+            outofbounds = true;
+            break;
+          }
+        }
+        generator = new SphereCluster(center, radius);
+      }
+      if (tryCounter < maxOverlapFitRuns) {
+        generator.setId(label);
+        double avgWeight = 1.0 / numClusterOption.getValue();
+        double weight = avgWeight + (instanceRandom.nextBoolean() ? -1 : 1) * 
avgWeight * densityRangeOption.getValue()
+            * instanceRandom.nextDouble();
+        generator.setWeight(weight);
+        setDesitnation(null);
+      }
+      else {
+        generator = null;
+        kill = 0;
+        System.out.println("Tried " + maxOverlapFitRuns + " times to create 
kernel. Reduce average radii.");
+      }
+    }
+
+    public GeneratorCluster(int label, SphereCluster cluster) {
+      this.generator = cluster;
+      cluster.setId(label);
+      setDesitnation(null);
+    }
+
+    public int getWorkID() {
+      for (int c = 0; c < kernels.size(); c++) {
+        if (kernels.get(c).equals(this))
+          return c;
+      }
+      return -1;
+    }
+
+    private void updateKernel() {
+      if (kill == 0) {
+        kernels.remove(this);
+      }
+      if (kill > 0) {
+        kill--;
+      }
+      // we could be lot more precise if we would keep track of timestamps of
+      // points
+      // then we could remove all old points and rebuild the cluster on up to
+      // date point base
+      // BUT worse the effort??? so far we just want to avoid overlap with 
this,
+      // so its more
+      // konservative as needed. Only needs to change when we need a thighter
+      // representation
+      for (int m = 0; m < microClusters.size(); m++) {
+        if (numGeneratedInstances - microClustersDecay.get(m) > 
decayHorizonOption.getValue()) {
+          microClusters.remove(m);
+          microClustersPoints.remove(m);
+          microClustersDecay.remove(m);
+        }
+      }
+
+      if (!points.isEmpty()
+          && numGeneratedInstances - points.getFirst().getTimestamp() >= 
decayHorizonOption.getValue()) {
+        // if(debug)
+        // System.out.println("Cleaning up macro cluster "+generator.getId());
+        points.removeFirst();
+      }
+
+    }
+
+    private void addInstance(Instance instance) {
+      DataPoint point = new DataPoint(instance, numGeneratedInstances);
+      points.add(point);
+
+      int minMicroIndex = -1;
+      double minHullDist = Double.MAX_VALUE;
+      boolean inserted = false;
+      // we favour more recently build clusters so we can remove earlier 
cluster
+      // sooner
+      for (int m = microClusters.size() - 1; m >= 0; m--) {
+        SphereCluster micro = microClusters.get(m);
+        double hulldist = micro.getCenterDistance(point) - micro.getRadius();
+        // point fits into existing cluster
+        if (hulldist <= 0) {
+          microClustersPoints.get(m).add(point);
+          microClustersDecay.set(m, numGeneratedInstances);
+          inserted = true;
+          break;
+        }
+        // if not, check if its at least the closest cluster
+        else {
+          if (hulldist < minHullDist) {
+            minMicroIndex = m;
+            minHullDist = hulldist;
+          }
+        }
+      }
+      // Reseting index choice for alternative cluster building
+      int alt = 1;
+      if (alt == 1)
+        minMicroIndex = -1;
+      if (!inserted) {
+        // add to closest cluster and expand cluster
+        if (minMicroIndex != -1) {
+          microClustersPoints.get(minMicroIndex).add(point);
+          // we should keep the miniball instances and just check in
+          // new points instead of rebuilding the whole thing
+          SphereCluster s = new 
SphereCluster(microClustersPoints.get(minMicroIndex), numAttsOption.getValue());
+          // check if current microcluster is bigger then generating cluster
+          if (s.getRadius() > generator.getRadius()) {
+            // remove previously added point
+            
microClustersPoints.get(minMicroIndex).remove(microClustersPoints.get(minMicroIndex).size()
 - 1);
+            minMicroIndex = -1;
+          }
+          else {
+            microClusters.set(minMicroIndex, s);
+            microClustersDecay.set(minMicroIndex, numGeneratedInstances);
+          }
+        }
+        // minMicroIndex might have been reset above
+        // create new micro cluster
+        if (minMicroIndex == -1) {
+          ArrayList<DataPoint> microPoints = new ArrayList<DataPoint>();
+          microPoints.add(point);
+          SphereCluster s;
+          if (alt == 0)
+            s = new SphereCluster(microPoints, numAttsOption.getValue());
+          else
+            s = new SphereCluster(generator.getCenter(), 
generator.getRadius(), 1);
+
+          microClusters.add(s);
+          microClustersPoints.add(microPoints);
+          microClustersDecay.add(numGeneratedInstances);
+          int id = 0;
+          while (id < kernels.size()) {
+            if (kernels.get(id) == this)
+              break;
+            id++;
+          }
+          s.setGroundTruth(id);
+        }
+      }
+
+    }
+
+    private void move() {
+      if (currentMovementSteps < totalMovementSteps) {
+        currentMovementSteps++;
+        if (moveVector == null) {
+          return;
+        }
+        else {
+          double[] center = generator.getCenter();
+          boolean outofbounds = true;
+          while (outofbounds) {
+            double radius = generator.getRadius();
+            outofbounds = false;
+            center = generator.getCenter();
+            for (int d = 0; d < center.length; d++) {
+              center[d] += moveVector[d];
+              if (center[d] - radius < 0 || center[d] + radius > 1) {
+                outofbounds = true;
+                setDesitnation(null);
+                break;
+              }
+            }
+          }
+          generator.setCenter(center);
+        }
+      }
+      else {
+        if (!merging) {
+          setDesitnation(null);
+          isSplitting = false;
+        }
+      }
+    }
+
+    void setDesitnation(double[] destination) {
+
+      if (destination == null) {
+        destination = new double[numAttsOption.getValue()];
+        for (int j = 0; j < numAttsOption.getValue(); j++) {
+          destination[j] = instanceRandom.nextDouble();
+        }
+      }
+      double[] center = generator.getCenter();
+      int dim = center.length;
+
+      double[] v = new double[dim];
+
+      for (int d = 0; d < dim; d++) {
+        v[d] = destination[d] - center[d];
+      }
+      setMoveVector(v);
+    }
+
+    void setMoveVector(double[] vector) {
+      // we are ignoring the steps, otherwise we have to change
+      // speed of the kernels, do we want that?
+      moveVector = vector;
+      int speedInPoints = speedOption.getValue();
+      if (speedRangeOption.getValue() > 0)
+        speedInPoints += (instanceRandom.nextBoolean() ? -1 : 1) * 
instanceRandom.nextInt(speedRangeOption.getValue());
+      if (speedInPoints < 1)
+        speedInPoints = speedOption.getValue();
+
+      double length = 0;
+      for (int d = 0; d < moveVector.length; d++) {
+        length += Math.pow(vector[d], 2);
+      }
+      length = Math.sqrt(length);
+
+      totalMovementSteps = (int) (length / (maxDistanceMoveThresholdByStep * 
kernelMovePointFrequency) * speedInPoints);
+      for (int d = 0; d < moveVector.length; d++) {
+        moveVector[d] /= (double) totalMovementSteps;
+      }
+
+      currentMovementSteps = 0;
+      // if(debug){
+      // System.out.println("Setting new direction for C"+generator.getId()+": 
distance "
+      // +length+" in "+totalMovementSteps+" steps");
+      // }
+    }
+
+    private String tryMerging(GeneratorCluster merge) {
+      String message = "";
+      double overlapDegree = generator.overlapRadiusDegree(merge.generator);
+      if (overlapDegree > merge_threshold) {
+        SphereCluster mcluster = merge.generator;
+        double radius = Math.max(generator.getRadius(), mcluster.getRadius());
+        generator.combine(mcluster);
+
+        // //adjust radius, get bigger and bigger with high dim data
+        generator.setRadius(radius);
+        // double[] center = generator.getCenter();
+        // double[] mcenter = mcluster.getCenter();
+        // double weight = generator.getWeight();
+        // double mweight = generator.getWeight();
+        // // for (int i = 0; i < center.length; i++) {
+        // // center[i] = (center[i] * weight + mcenter[i] * mweight) / 
(mweight
+        // + weight);
+        // // }
+        // generator.setWeight(weight + mweight);
+        message = "Clusters merging: " + mergeClusterB.generator.getId() + " 
into " + mergeClusterA.generator.getId();
+
+        // clean up and restet merging stuff
+        // mark kernel so it gets killed when it doesn't contain any more
+        // instances
+        merge.kill = decayHorizonOption.getValue();
+        // set weight to 0 so no new instances will be created in the cluster
+        mcluster.setWeight(0.0);
+        normalizeWeights();
+        numActiveKernels--;
+        mergeClusterB = mergeClusterA = null;
+        merging = false;
+        mergeKernelsOverlapping = false;
+      }
+      else {
+        if (overlapDegree > 0 && !mergeKernelsOverlapping) {
+          mergeKernelsOverlapping = true;
+          message = "Merge overlapping started";
+        }
+      }
+      return message;
+    }
+
+    private String splitKernel() {
+      isSplitting = true;
+      // todo radius range
+      double radius = kernelRadiiOption.getValue();
+      double avgWeight = 1.0 / numClusterOption.getValue();
+      double weight = avgWeight + avgWeight * densityRangeOption.getValue() * 
instanceRandom.nextDouble();
+      SphereCluster spcluster = null;
+
+      double[] center = generator.getCenter();
+      spcluster = new SphereCluster(center, radius, weight);
+
+      if (spcluster != null) {
+        GeneratorCluster gc = new GeneratorCluster(clusterIdCounter++, 
spcluster);
+        gc.isSplitting = true;
+        kernels.add(gc);
+        normalizeWeights();
+        numActiveKernels++;
+        return "Split from Kernel " + generator.getId();
+      }
+      else {
+        System.out.println("Tried to split new kernel from C" + 
generator.getId() +
+            ". Not enough room for new cluster, decrease average radii, number 
of clusters or enable overlap.");
+        return "";
+      }
+    }
+
+    private String fadeOut() {
+      kill = decayHorizonOption.getValue();
+      generator.setWeight(0.0);
+      numActiveKernels--;
+      normalizeWeights();
+      return "Fading out C" + generator.getId();
+    }
+
+  }
+
+  public RandomRBFGeneratorEvents() {
+    noiseInClusterOption.set();
+    // eventDeleteCreateOption.set();
+    // eventMergeSplitOption.set();
+  }
+
+  public InstancesHeader getHeader() {
+    return streamHeader;
+  }
+
+  public long estimatedRemainingInstances() {
+    return -1;
+  }
+
+  public boolean hasMoreInstances() {
+    return true;
+  }
+
+  public boolean isRestartable() {
+    return true;
+  }
+
+  @Override
+  public void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+    monitor.setCurrentActivity("Preparing random RBF...", -1.0);
+    generateHeader();
+    restart();
+  }
+
+  public void restart() {
+    instanceRandom = new Random(instanceRandomSeedOption.getValue());
+    nextEventCounter = eventFrequencyOption.getValue();
+    nextEventChoice = getNextEvent();
+    numActiveKernels = 0;
+    numGeneratedInstances = 0;
+    clusterIdCounter = 0;
+    mergeClusterA = mergeClusterB = null;
+    kernels = new AutoExpandVector<GeneratorCluster>();
+
+    initKernels();
+  }
+
+  protected void generateHeader() { // 2013/06/02: Noise label
+    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
+    for (int i = 0; i < this.numAttsOption.getValue(); i++) {
+      attributes.add(new Attribute("att" + (i + 1)));
+    }
+
+    ArrayList<String> classLabels = new ArrayList<String>();
+    for (int i = 0; i < this.numClusterOption.getValue(); i++) {
+      classLabels.add("class" + (i + 1));
+    }
+    if (noiseLevelOption.getValue() > 0)
+      classLabels.add("noise"); // The last label = "noise"
+
+    attributes.add(new Attribute("class", classLabels));
+    streamHeader = new InstancesHeader(new 
Instances(getCLICreationString(InstanceStream.class), attributes, 0));
+    streamHeader.setClassIndex(streamHeader.numAttributes() - 1);
+  }
+
+  protected void initKernels() {
+    for (int i = 0; i < numClusterOption.getValue(); i++) {
+      kernels.add(new GeneratorCluster(clusterIdCounter));
+      numActiveKernels++;
+      clusterIdCounter++;
+    }
+    normalizeWeights();
+  }
+
+  public InstanceExample nextInstance() {
+    numGeneratedInstances++;
+    eventScheduler();
+
+    // make room for the classlabel
+    double[] values_new = new double[numAttsOption.getValue()]; // +1
+    double[] values = null;
+    int clusterChoice = -1;
+
+    if (instanceRandom.nextDouble() > noiseLevelOption.getValue()) {
+      clusterChoice = chooseWeightedElement();
+      values = 
kernels.get(clusterChoice).generator.sample(instanceRandom).toDoubleArray();
+    }
+    else {
+      // get ranodm noise point
+      values = getNoisePoint();
+    }
+
+    if (Double.isNaN(values[0])) {
+      System.out.println("Instance corrupted:" + numGeneratedInstances);
+    }
+    System.arraycopy(values, 0, values_new, 0, values.length);
+
+    Instance inst = new DenseInstance(1.0, values_new);
+    inst.setDataset(getHeader());
+    if (clusterChoice == -1) {
+      // 2013/06/02 (Yunsu Kim)
+      // Noise instance has the last class value instead of "-1"
+      // Preventing ArrayIndexOutOfBoundsException in WriteStreamToARFFFile
+      inst.setClassValue(numClusterOption.getValue());
+    }
+    else {
+      inst.setClassValue(kernels.get(clusterChoice).generator.getId());
+      // Do we need micro cluster representation if have overlapping clusters?
+      // if(!overlappingOption.isSet())
+      kernels.get(clusterChoice).addInstance(inst);
+    }
+    // System.out.println(numGeneratedInstances+": Overlap 
is"+updateOverlaps());
+
+    return new InstanceExample(inst);
+  }
+
+  public Clustering getGeneratingClusters() {
+    Clustering clustering = new Clustering();
+    for (int c = 0; c < kernels.size(); c++) {
+      clustering.add(kernels.get(c).generator);
+    }
+    return clustering;
+  }
+
+  public Clustering getMicroClustering() {
+    Clustering clustering = new Clustering();
+    int id = 0;
+
+    for (int c = 0; c < kernels.size(); c++) {
+      for (int m = 0; m < kernels.get(c).microClusters.size(); m++) {
+        kernels.get(c).microClusters.get(m).setId(id);
+        
kernels.get(c).microClusters.get(m).setGroundTruth(kernels.get(c).generator.getId());
+        clustering.add(kernels.get(c).microClusters.get(m));
+        id++;
+      }
+    }
+
+    // System.out.println("numMicroKernels "+clustering.size());
+    return clustering;
+  }
+
+  /**************************** EVENTS 
******************************************/
+  private void eventScheduler() {
+
+    for (int i = 0; i < kernels.size(); i++) {
+      kernels.get(i).updateKernel();
+    }
+
+    nextEventCounter--;
+    // only move kernels every 10 points, performance reasons????
+    // should this be randomized as well???
+    if (nextEventCounter % kernelMovePointFrequency == 0) {
+      // move kernels
+      for (int i = 0; i < kernels.size(); i++) {
+        kernels.get(i).move();
+        // overlapControl();
+      }
+    }
+
+    if (eventFrequencyOption.getValue() == 0) {
+      return;
+    }
+
+    String type = "";
+    String message = "";
+    boolean eventFinished = false;
+    switch (nextEventChoice) {
+    case 0:
+      if (numActiveKernels > 1 && numActiveKernels > 
numClusterOption.getValue() - numClusterRangeOption.getValue()) {
+        message = mergeKernels(nextEventCounter);
+        type = "Merge";
+      }
+      if (mergeClusterA == null && mergeClusterB == null && 
message.startsWith("Clusters merging")) {
+        eventFinished = true;
+      }
+      break;
+    case 1:
+      if (nextEventCounter <= 0) {
+        if (numActiveKernels < numClusterOption.getValue() + 
numClusterRangeOption.getValue()) {
+          type = "Split";
+          message = splitKernel();
+        }
+        eventFinished = true;
+      }
+      break;
+    case 2:
+      if (nextEventCounter <= 0) {
+        if (numActiveKernels > 1 && numActiveKernels > 
numClusterOption.getValue() - numClusterRangeOption.getValue()) {
+          message = fadeOut();
+          type = "Delete";
+        }
+        eventFinished = true;
+      }
+      break;
+    case 3:
+      if (nextEventCounter <= 0) {
+        if (numActiveKernels < numClusterOption.getValue() + 
numClusterRangeOption.getValue()) {
+          message = fadeIn();
+          type = "Create";
+        }
+        eventFinished = true;
+      }
+      break;
+
+    }
+    if (eventFinished) {
+      nextEventCounter = (int) (eventFrequencyOption.getValue() + 
(instanceRandom.nextBoolean() ? -1 : 1)
+          * eventFrequencyOption.getValue() * eventFrequencyRange * 
instanceRandom.nextDouble());
+      nextEventChoice = getNextEvent();
+      // System.out.println("Next event choice: "+nextEventChoice);
+    }
+    if (!message.isEmpty()) {
+      message += " (numKernels = " + numActiveKernels + " at " + 
numGeneratedInstances + ")";
+      if (!type.equals("Merge") || message.startsWith("Clusters merging"))
+        fireClusterChange(numGeneratedInstances, type, message);
+    }
+  }
+
+  private int getNextEvent() {
+    int choice = -1;
+    boolean lowerLimit = numActiveKernels <= numClusterOption.getValue() - 
numClusterRangeOption.getValue();
+    boolean upperLimit = numActiveKernels >= numClusterOption.getValue() + 
numClusterRangeOption.getValue();
+
+    if (!lowerLimit || !upperLimit) {
+      int mode = -1;
+      if (eventDeleteCreateOption.isSet() && eventMergeSplitOption.isSet()) {
+        mode = instanceRandom.nextInt(2);
+      }
+
+      if (mode == 0 || (mode == -1 && eventMergeSplitOption.isSet())) {
+        // have we reached a limit? if not free choice
+        if (!lowerLimit && !upperLimit)
+          choice = instanceRandom.nextInt(2);
+        else
+        // we have a limit. if lower limit, choose split
+        if (lowerLimit)
+          choice = 1;
+        // otherwise we reached upper level, choose merge
+        else
+          choice = 0;
+      }
+
+      if (mode == 1 || (mode == -1 && eventDeleteCreateOption.isSet())) {
+        // have we reached a limit? if not free choice
+        if (!lowerLimit && !upperLimit)
+          choice = instanceRandom.nextInt(2) + 2;
+        else
+        // we have a limit. if lower limit, choose create
+        if (lowerLimit)
+          choice = 3;
+        // otherwise we reached upper level, choose delete
+        else
+          choice = 2;
+      }
+    }
+
+    return choice;
+  }
+
+  private String fadeOut() {
+    int id = instanceRandom.nextInt(kernels.size());
+    while (kernels.get(id).kill != -1)
+      id = instanceRandom.nextInt(kernels.size());
+
+    String message = kernels.get(id).fadeOut();
+    return message;
+  }
+
+  private String fadeIn() {
+    GeneratorCluster gc = new GeneratorCluster(clusterIdCounter++);
+    kernels.add(gc);
+    numActiveKernels++;
+    normalizeWeights();
+    return "Creating new cluster";
+  }
+
+  private String changeWeight(boolean increase) {
+    double changeRate = 0.1;
+    int id = instanceRandom.nextInt(kernels.size());
+    while (kernels.get(id).kill != -1)
+      id = instanceRandom.nextInt(kernels.size());
+
+    int sign = 1;
+    if (!increase)
+      sign = -1;
+    double weight_old = kernels.get(id).generator.getWeight();
+    double delta = sign * numActiveKernels * instanceRandom.nextDouble() * 
changeRate;
+    kernels.get(id).generator.setWeight(weight_old + delta);
+
+    normalizeWeights();
+
+    String message;
+    if (increase)
+      message = "Increase ";
+    else
+      message = "Decrease ";
+    message += " weight on Cluster " + id + " from " + weight_old + " to " + 
(weight_old + delta);
+    return message;
+
+  }
+
+  private String changeRadius(boolean increase) {
+    double maxChangeRate = 0.1;
+    int id = instanceRandom.nextInt(kernels.size());
+    while (kernels.get(id).kill != -1)
+      id = instanceRandom.nextInt(kernels.size());
+
+    int sign = 1;
+    if (!increase)
+      sign = -1;
+
+    double r_old = kernels.get(id).generator.getRadius();
+    double r_new = r_old + sign * r_old * instanceRandom.nextDouble() * 
maxChangeRate;
+    if (r_new >= 0.5)
+      return "Radius to big";
+    kernels.get(id).generator.setRadius(r_new);
+
+    String message;
+    if (increase)
+      message = "Increase ";
+    else
+      message = "Decrease ";
+    message += " radius on Cluster " + id + " from " + r_old + " to " + r_new;
+    return message;
+  }
+
+  private String splitKernel() {
+    int id = instanceRandom.nextInt(kernels.size());
+    while (kernels.get(id).kill != -1)
+      id = instanceRandom.nextInt(kernels.size());
+
+    String message = kernels.get(id).splitKernel();
+
+    return message;
+  }
+
+  private String mergeKernels(int steps) {
+    if (numActiveKernels > 1 && ((mergeClusterA == null && mergeClusterB == 
null))) {
+
+      // choose clusters to merge
+      double diseredDist = steps / speedOption.getValue() * 
maxDistanceMoveThresholdByStep;
+      double minDist = Double.MAX_VALUE;
+      // System.out.println("DisredDist:"+(2*diseredDist));
+      for (int i = 0; i < kernels.size(); i++) {
+        for (int j = 0; j < i; j++) {
+          if (kernels.get(i).kill != -1 || kernels.get(j).kill != -1) {
+            continue;
+          }
+          else {
+            double kernelDist = 
kernels.get(i).generator.getCenterDistance(kernels.get(j).generator);
+            double d = kernelDist - 2 * diseredDist;
+            // System.out.println("Dist:"+i+" / "+j+" "+d);
+            if (Math.abs(d) < minDist &&
+                (minDist != Double.MAX_VALUE || d > 0 || Math.abs(d) < 0.001)) 
{
+              minDist = Math.abs(d);
+              mergeClusterA = kernels.get(i);
+              mergeClusterB = kernels.get(j);
+            }
+          }
+        }
+      }
+
+      if (mergeClusterA != null && mergeClusterB != null) {
+        double[] merge_point = mergeClusterA.generator.getCenter();
+        double[] v = 
mergeClusterA.generator.getDistanceVector(mergeClusterB.generator);
+        for (int i = 0; i < v.length; i++) {
+          merge_point[i] = merge_point[i] + v[i] * 0.5;
+        }
+
+        mergeClusterA.merging = true;
+        mergeClusterB.merging = true;
+        mergeClusterA.setDesitnation(merge_point);
+        mergeClusterB.setDesitnation(merge_point);
+
+        if (debug) {
+          System.out.println("Center1" + 
Arrays.toString(mergeClusterA.generator.getCenter()));
+          System.out.println("Center2" + 
Arrays.toString(mergeClusterB.generator.getCenter()));
+          System.out.println("Vector" + Arrays.toString(v));
+
+          System.out.println("Try to merge cluster " + 
mergeClusterA.generator.getId() +
+              " into " + mergeClusterB.generator.getId() +
+              " at " + Arrays.toString(merge_point) +
+              " time " + numGeneratedInstances);
+        }
+        return "Init merge";
+      }
+    }
+
+    if (mergeClusterA != null && mergeClusterB != null) {
+
+      // movekernels will move the kernels close to each other,
+      // we just need to check and merge here if they are close enough
+      return mergeClusterA.tryMerging(mergeClusterB);
+    }
+
+    return "";
+  }
+
+  /************************* TOOLS **************************************/
+
+  public void getDescription(StringBuilder sb, int indent) {
+
+  }
+
+  private double[] getNoisePoint() {
+    double[] sample = new double[numAttsOption.getValue()];
+    boolean incluster = true;
+    int counter = 20;
+    while (incluster) {
+      for (int j = 0; j < numAttsOption.getValue(); j++) {
+        sample[j] = instanceRandom.nextDouble();
+      }
+      incluster = false;
+      if (!noiseInClusterOption.isSet() && counter > 0) {
+        counter--;
+        for (int c = 0; c < kernels.size(); c++) {
+          for (int m = 0; m < kernels.get(c).microClusters.size(); m++) {
+            Instance inst = new DenseInstance(1, sample);
+            if 
(kernels.get(c).microClusters.get(m).getInclusionProbability(inst) > 0) {
+              incluster = true;
+              break;
+            }
+          }
+          if (incluster)
+            break;
+        }
+      }
+    }
+
+    // double [] sample = new double [numAttsOption.getValue()];
+    // for (int j = 0; j < numAttsOption.getValue(); j++) {
+    // sample[j] = instanceRandom.nextDouble();
+    // }
+
+    return sample;
+  }
+
+  private int chooseWeightedElement() {
+    double r = instanceRandom.nextDouble();
+
+    // Determine index of choosen element
+    int i = 0;
+    while (r > 0.0) {
+      r -= kernels.get(i).generator.getWeight();
+      i++;
+    }
+    --i; // Overcounted once
+    // System.out.println(i);
+    return i;
+  }
+
+  private void normalizeWeights() {
+    double sumWeights = 0.0;
+    for (int i = 0; i < kernels.size(); i++) {
+      sumWeights += kernels.get(i).generator.getWeight();
+    }
+    for (int i = 0; i < kernels.size(); i++) {
+      kernels.get(i).generator.setWeight(kernels.get(i).generator.getWeight() 
/ sumWeights);
+    }
+  }
+
+  /*************** EVENT Listener *********************/
+  // should go into the superclass of the generator, create new one for cluster
+  // streams?
+
+  /** Add a listener */
+  synchronized public void addClusterChangeListener(ClusterEventListener l) {
+    if (listeners == null)
+      listeners = new Vector();
+    listeners.addElement(l);
+  }
+
+  /** Remove a listener */
+  synchronized public void removeClusterChangeListener(ClusterEventListener l) 
{
+    if (listeners == null)
+      listeners = new Vector();
+    listeners.removeElement(l);
+  }
+
+  /** Fire a ClusterChangeEvent to all registered listeners */
+  protected void fireClusterChange(long timestamp, String type, String 
message) {
+    // if we have no listeners, do nothing...
+    if (listeners != null && !listeners.isEmpty()) {
+      // create the event object to send
+      ClusterEvent event =
+          new ClusterEvent(this, timestamp, type, message);
+
+      // make a copy of the listener list in case
+      // anyone adds/removes listeners
+      Vector targets;
+      synchronized (this) {
+        targets = (Vector) listeners.clone();
+      }
+
+      // walk through the listener list and
+      // call the sunMoved method in each
+      Enumeration e = targets.elements();
+      while (e.hasMoreElements()) {
+        ClusterEventListener l = (ClusterEventListener) e.nextElement();
+        l.changeCluster(event);
+
+      }
+    }
+  }
+
+  @Override
+  public String getPurposeString() {
+    return "Generates a random radial basis function stream.";
+  }
+
+  public String getParameterString() {
+    return "";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/HyperplaneGenerator.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/HyperplaneGenerator.java
 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/HyperplaneGenerator.java
new file mode 100644
index 0000000..b05fa1b
--- /dev/null
+++ 
b/samoa-api/src/main/java/org/apache/samoa/moa/streams/generators/HyperplaneGenerator.java
@@ -0,0 +1,186 @@
+package org.apache.samoa.moa.streams.generators;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Copyright (C) 2014 - 2015 Apache Software Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import java.util.Random;
+
+import org.apache.samoa.instances.Attribute;
+import org.apache.samoa.instances.DenseInstance;
+import org.apache.samoa.instances.Instance;
+import org.apache.samoa.instances.Instances;
+import org.apache.samoa.instances.InstancesHeader;
+import org.apache.samoa.moa.core.Example;
+import org.apache.samoa.moa.core.FastVector;
+import org.apache.samoa.moa.core.InstanceExample;
+import org.apache.samoa.moa.core.ObjectRepository;
+import org.apache.samoa.moa.options.AbstractOptionHandler;
+import org.apache.samoa.moa.streams.InstanceStream;
+import org.apache.samoa.moa.tasks.TaskMonitor;
+
+import com.github.javacliparser.FloatOption;
+import com.github.javacliparser.IntOption;
+
+/**
+ * Stream generator for Hyperplane data stream.
+ * 
+ * @author Albert Bifet (abifet at cs dot waikato dot ac dot nz)
+ * @version $Revision: 7 $
+ */
+public class HyperplaneGenerator extends AbstractOptionHandler implements 
InstanceStream {
+
+  @Override
+  public String getPurposeString() {
+    return "Generates a problem of predicting class of a rotating hyperplane.";
+  }
+
+  private static final long serialVersionUID = 1L;
+
+  public IntOption instanceRandomSeedOption = new 
IntOption("instanceRandomSeed", 'i',
+      "Seed for random generation of instances.", 1);
+
+  public IntOption numClassesOption = new IntOption("numClasses", 'c', "The 
number of classes to generate.", 2, 2,
+      Integer.MAX_VALUE);
+
+  public IntOption numAttsOption = new IntOption("numAtts", 'a', "The number 
of attributes to generate.", 10, 0,
+      Integer.MAX_VALUE);
+
+  public IntOption numDriftAttsOption = new IntOption("numDriftAtts", 'k', 
"The number of attributes with drift.", 2,
+      0, Integer.MAX_VALUE);
+
+  public FloatOption magChangeOption = new FloatOption("magChange", 't', 
"Magnitude of the change for every example",
+      0.0, 0.0, 1.0);
+
+  public IntOption noisePercentageOption = new IntOption("noisePercentage", 
'n',
+      "Percentage of noise to add to the data.", 5, 0, 100);
+
+  public IntOption sigmaPercentageOption = new IntOption("sigmaPercentage", 
's',
+      "Percentage of probability that the direction of change is reversed.", 
10,
+      0, 100);
+
+  protected InstancesHeader streamHeader;
+
+  protected Random instanceRandom;
+
+  protected double[] weights;
+
+  protected int[] sigma;
+
+  public int numberInstance;
+
+  @Override
+  protected void prepareForUseImpl(TaskMonitor monitor, ObjectRepository 
repository) {
+    monitor.setCurrentActivity("Preparing hyperplane...", -1.0);
+    generateHeader();
+    restart();
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  protected void generateHeader() {
+    FastVector attributes = new FastVector();
+    for (int i = 0; i < this.numAttsOption.getValue(); i++) {
+      attributes.addElement(new Attribute("att" + (i + 1)));
+    }
+
+    FastVector classLabels = new FastVector();
+    for (int i = 0; i < this.numClassesOption.getValue(); i++) {
+      classLabels.addElement("class" + (i + 1));
+    }
+    attributes.addElement(new Attribute("class", classLabels));
+    this.streamHeader = new InstancesHeader(new 
Instances(getCLICreationString(InstanceStream.class), attributes, 0));
+    this.streamHeader.setClassIndex(this.streamHeader.numAttributes() - 1);
+  }
+
+  @Override
+  public long estimatedRemainingInstances() {
+    return -1;
+  }
+
+  @Override
+  public InstancesHeader getHeader() {
+    return this.streamHeader;
+  }
+
+  @Override
+  public boolean hasMoreInstances() {
+    return true;
+  }
+
+  @Override
+  public boolean isRestartable() {
+    return true;
+  }
+
+  @Override
+  public Example<Instance> nextInstance() {
+
+    int numAtts = this.numAttsOption.getValue();
+    double[] attVals = new double[numAtts + 1];
+    double sum = 0.0;
+    double sumWeights = 0.0;
+    for (int i = 0; i < numAtts; i++) {
+      attVals[i] = this.instanceRandom.nextDouble();
+      sum += this.weights[i] * attVals[i];
+      sumWeights += this.weights[i];
+    }
+    int classLabel;
+    if (sum >= sumWeights * 0.5) {
+      classLabel = 1;
+    } else {
+      classLabel = 0;
+    }
+    // Add Noise
+    if ((1 + (this.instanceRandom.nextInt(100))) <= 
this.noisePercentageOption.getValue()) {
+      classLabel = (classLabel == 0 ? 1 : 0);
+    }
+
+    Instance inst = new DenseInstance(1.0, attVals);
+    inst.setDataset(getHeader());
+    inst.setClassValue(classLabel);
+    addDrift();
+    return new InstanceExample(inst);
+  }
+
+  private void addDrift() {
+    for (int i = 0; i < this.numDriftAttsOption.getValue(); i++) {
+      this.weights[i] += (double) ((double) sigma[i]) * ((double) 
this.magChangeOption.getValue());
+      if (// this.weights[i] >= 1.0 || this.weights[i] <= 0.0 ||
+      (1 + (this.instanceRandom.nextInt(100))) <= 
this.sigmaPercentageOption.getValue()) {
+        this.sigma[i] *= -1;
+      }
+    }
+  }
+
+  @Override
+  public void restart() {
+    this.instanceRandom = new Random(this.instanceRandomSeedOption.getValue());
+    this.weights = new double[this.numAttsOption.getValue()];
+    this.sigma = new int[this.numAttsOption.getValue()];
+    for (int i = 0; i < this.numAttsOption.getValue(); i++) {
+      this.weights[i] = this.instanceRandom.nextDouble();
+      this.sigma[i] = (i < this.numDriftAttsOption.getValue() ? 1 : 0);
+    }
+  }
+
+  @Override
+  public void getDescription(StringBuilder sb, int indent) {
+    // TODO Auto-generated method stub
+  }
+}


Reply via email to