SAMOA-43: Add WriteArffFile
Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/4851dd9f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/4851dd9f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/4851dd9f Branch: refs/heads/master Commit: 4851dd9fd515c11eb212ac340fe0a536a6dd1e73 Parents: 473a419 Author: abifet <[email protected]> Authored: Mon Sep 14 16:46:39 2015 +0200 Committer: Albert Bifet <[email protected]> Committed: Wed Mar 16 15:33:36 2016 +0100 ---------------------------------------------------------------------- .../org/apache/samoa/tasks/WriteArffFile.java | 142 +++++++++++++++++++ 1 file changed, 142 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/4851dd9f/samoa-api/src/main/java/org/apache/samoa/tasks/WriteArffFile.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/tasks/WriteArffFile.java b/samoa-api/src/main/java/org/apache/samoa/tasks/WriteArffFile.java new file mode 100644 index 0000000..d792b11 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/tasks/WriteArffFile.java @@ -0,0 +1,142 @@ +package org.apache.samoa.tasks; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.github.javacliparser.*; +import org.apache.samoa.moa.options.AbstractOptionHandler; +import org.apache.samoa.moa.streams.InstanceStream; +import org.apache.samoa.moa.streams.clustering.RandomRBFGeneratorEvents; +import org.apache.samoa.streams.PrequentialSourceProcessor; +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.Writer; + +/** + * A task that runs and evaluates a distributed clustering algorithm. + */ +public class WriteArffFile implements Task, Configurable { + + + private static final long serialVersionUID = -8246537378371580524L; + + private static final Logger logger = LoggerFactory.getLogger(WriteArffFile.class); + + public ClassOption streamOption = new ClassOption("stream", 's', "Input stream.", InstanceStream.class, + RandomRBFGeneratorEvents.class.getName()); + + public FileOption arffFileOption = new FileOption("arffFile", 'f', + "Destination ARFF file.", null, "arff", true); + + public IntOption maxInstancesOption = new IntOption("maxInstances", 'm', + "Maximum number of instances to write to file.", 10000000, 0, + Integer.MAX_VALUE); + + public FlagOption suppressHeaderOption = new FlagOption("suppressHeader", + 'h', "Suppress header from output."); + + + public void getDescription(StringBuilder sb) { + sb.append("Writing a stream to an ARFF File"); + } + + @Override + public void init() { + + if (builder == null) { + logger.warn("Builder was not initialized, initializing it from the Task"); + + builder = new TopologyBuilder(); + logger.debug("Successfully instantiating TopologyBuilder"); + + builder.initTopology(arffFileOption.getValue()); + logger.debug("Successfully initializing SAMOA topology with name {}", arffFileOption.getValue()); + } + + InstanceStream stream = this.streamOption.getValue(); + initStream(stream); + + preqSource = new PrequentialSourceProcessor(); + preqSource.setStreamSource(stream); + preqSource.setMaxNumInstances(this.maxInstancesOption.getValue()); + builder.addEntranceProcessor(preqSource); + + File destFile = this.arffFileOption.getFile(); + if (destFile != null) { + try { + Writer w = new BufferedWriter(new FileWriter(destFile)); + if (!this.suppressHeaderOption.isSet()) { + w.write(stream.getHeader().toStringArff()); + w.write("\n"); + } + int numWritten = 0; + while ((numWritten < this.maxInstancesOption.getValue()) + && stream.hasMoreInstances()) { + w.write(stream.nextInstance().getData().toString()); + w.write("\n"); + numWritten++; + } + w.close(); + } catch (Exception ex) { + throw new RuntimeException( + "Failed writing to file " + destFile, ex); + } + this.topology = builder.build(); + return; + } + throw new IllegalArgumentException("No destination file to write to."); + } + + private Topology topology; + + private TopologyBuilder builder; + + private PrequentialSourceProcessor preqSource; + + @Override + public void setFactory(ComponentFactory factory) { + // TODO unify this code with init() for now, it's used by S4 App + // dynamic binding theoretically will solve this problem + builder = new TopologyBuilder(factory); + logger.debug("Successfully instantiated TopologyBuilder"); + + builder.initTopology(arffFileOption.getValue()); + logger.debug("Successfully initialized SAMOA topology with name {}", arffFileOption.getValue()); + + } + + public Topology getTopology() { + return topology; + } + + private void initStream(InstanceStream stream) { + if (stream instanceof AbstractOptionHandler) { + ((AbstractOptionHandler) (stream)).prepareForUse(); + } + + } +}
