http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java new file mode 100755 index 0000000..aebb136 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/topology/TopologyBuilder.java @@ -0,0 +1,223 @@ +package com.yahoo.labs.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.util.HashMap; +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.yahoo.labs.samoa.core.EntranceProcessor; +import com.yahoo.labs.samoa.core.Processor; + +/** + * Builder class that creates topology components and assemble them together. + * + */ +public class TopologyBuilder { + + // TODO: + // Possible options: + // 1. we may convert this as interface and platform dependent builder will inherit this method + // 2. refactor by combining TopologyBuilder, ComponentFactory and Topology + // -ve -> fat class where it has capabilities to instantiate specific component and connecting them + // +ve -> easy abstraction for SAMOA developer "you just implement your builder logic here!" + private ComponentFactory componentFactory; + private Topology topology; + private Map<Processor, IProcessingItem> mapProcessorToProcessingItem; + + // TODO: refactor, temporary constructor used by Storm code + public TopologyBuilder() { + // TODO: initialize _componentFactory using dynamic binding + // for now, use StormComponentFactory + // should the factory be Singleton (?) + // ans: at the moment, no, i.e. each builder will has its associated factory! + // and the factory will be instantiated using dynamic binding + // this.componentFactory = new StormComponentFactory(); + } + + // TODO: refactor, temporary constructor used by S4 code + public TopologyBuilder(ComponentFactory theFactory) { + this.componentFactory = theFactory; + } + + /** + * Initiates topology with a specific name. + * + * @param topologyName + */ + public void initTopology(String topologyName) { + this.initTopology(topologyName, 0); + } + + /** + * Initiates topology with a specific name and a delay between consecutive instances. + * + * @param topologyName + * @param delay + * delay between injections of two instances from source (in milliseconds) + */ + public void initTopology(String topologyName, int delay) { + if (this.topology != null) { + // TODO: possible refactor this code later + System.out.println("Topology has been initialized before!"); + return; + } + this.topology = componentFactory.createTopology(topologyName); + } + + /** + * Returns the platform specific topology. + * + * @return + */ + public Topology build() { + return topology; + } + + public ProcessingItem addProcessor(Processor processor, int parallelism) { + ProcessingItem pi = createPi(processor, parallelism); + if (this.mapProcessorToProcessingItem == null) + this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); + this.mapProcessorToProcessingItem.put(processor, pi); + return pi; + } + + public ProcessingItem addProcessor(Processor processor) { + return addProcessor(processor, 1); + } + + public ProcessingItem connectInputShuffleStream(Stream inputStream, Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + return pi.connectInputShuffleStream(inputStream); + } + + public ProcessingItem connectInputKeyStream(Stream inputStream, Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + return pi.connectInputKeyStream(inputStream); + } + + public ProcessingItem connectInputAllStream(Stream inputStream, Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + return pi.connectInputAllStream(inputStream); + } + + public Stream createInputShuffleStream(Processor processor, Processor dest) { + Stream inputStream = this.createStream(dest); + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + pi.connectInputShuffleStream(inputStream); + return inputStream; + } + + public Stream createInputKeyStream(Processor processor, Processor dest) { + Stream inputStream = this.createStream(dest); + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + pi.connectInputKeyStream(inputStream); + return inputStream; + } + + public Stream createInputAllStream(Processor processor, Processor dest) { + Stream inputStream = this.createStream(dest); + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + pi.connectInputAllStream(inputStream); + return inputStream; + } + + public Stream createStream(Processor processor) { + IProcessingItem pi = mapProcessorToProcessingItem.get(processor); + Stream ret = null; + Preconditions.checkNotNull(pi, "Trying to create stream from null PI"); + ret = this.createStream(pi); + if (pi instanceof EntranceProcessingItem) + ((EntranceProcessingItem) pi).setOutputStream(ret); + return ret; + } + + public EntranceProcessingItem addEntranceProcessor(EntranceProcessor entranceProcessor) { + EntranceProcessingItem pi = createEntrancePi(entranceProcessor); + if (this.mapProcessorToProcessingItem == null) + this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); + mapProcessorToProcessingItem.put(entranceProcessor, pi); + return pi; + } + + public ProcessingItem getProcessingItem(Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to retrieve null PI"); + return pi; + } + + /** + * Creates a processing item with a specific processor and paralellism level of 1. + * + * @param processor + * @return ProcessingItem + */ + @SuppressWarnings("unused") + private ProcessingItem createPi(Processor processor) { + return createPi(processor, 1); + } + + /** + * Creates a processing item with a specific processor and paralellism level. + * + * @param processor + * @param parallelism + * @return ProcessingItem + */ + private ProcessingItem createPi(Processor processor, int parallelism) { + ProcessingItem pi = this.componentFactory.createPi(processor, parallelism); + this.topology.addProcessingItem(pi, parallelism); + return pi; + } + + /** + * Creates a platform specific entrance processing item. + * + * @param processor + * @return + */ + private EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + EntranceProcessingItem epi = this.componentFactory.createEntrancePi(processor); + this.topology.addEntranceProcessingItem(epi); + if (this.mapProcessorToProcessingItem == null) + this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); + this.mapProcessorToProcessingItem.put(processor, epi); + return epi; + } + + /** + * Creates a platform specific stream. + * + * @param sourcePi + * source processing item. + * @return + */ + private Stream createStream(IProcessingItem sourcePi) { + Stream stream = this.componentFactory.createStream(sourcePi); + this.topology.addStream(stream); + return stream; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java new file mode 100644 index 0000000..ac6fc3f --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/PartitioningScheme.java @@ -0,0 +1,32 @@ +package com.yahoo.labs.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Represents the 3 schemes to partition the streams + * @author Anh Thu Vu + * + */ +public enum PartitioningScheme { + SHUFFLE, GROUP_BY_KEY, BROADCAST +} +// TODO: use this enum in S4 +// Storm doesn't seem to need this \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java new file mode 100644 index 0000000..2781fb8 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/StreamDestination.java @@ -0,0 +1,65 @@ +package com.yahoo.labs.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import com.yahoo.labs.samoa.topology.IProcessingItem; + +/** + * Represents one destination for streams. It has the info of: + * the ProcessingItem, parallelismHint, and partitioning scheme. + * Usage: + * - When ProcessingItem connects to a stream, it will pass + * a StreamDestination to the stream. + * - Stream manages a set of StreamDestination. + * - Used in single-threaded and multi-threaded local mode. + * @author Anh Thu Vu + * + */ +public class StreamDestination { + private IProcessingItem pi; + private int parallelism; + private PartitioningScheme type; + + /* + * Constructor + */ + public StreamDestination(IProcessingItem pi, int parallelismHint, PartitioningScheme type) { + this.pi = pi; + this.parallelism = parallelismHint; + this.type = type; + } + + /* + * Getters + */ + public IProcessingItem getProcessingItem() { + return this.pi; + } + + public int getParallelism() { + return this.parallelism; + } + + public PartitioningScheme getPartitioningScheme() { + return this.type; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java new file mode 100644 index 0000000..ce18d78 --- /dev/null +++ b/samoa-api/src/main/java/com/yahoo/labs/samoa/utils/Utils.java @@ -0,0 +1,182 @@ +package com.yahoo.labs.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.jar.Attributes; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; +import java.util.zip.ZipEntry; + +/** + * Utils class for building and deploying applications programmatically. + * @author severien + * + */ +public class Utils { + + public static void buildSamoaPackage() { + try { + String output = "/tmp/samoa/samoa.jar";// System.getProperty("user.home") + "/samoa.jar"; + Manifest manifest = createManifest(); + + BufferedOutputStream bo; + + bo = new BufferedOutputStream(new FileOutputStream(output)); + JarOutputStream jo = new JarOutputStream(bo, manifest); + + String baseDir = System.getProperty("user.dir"); + System.out.println(baseDir); + + File samoaJar = new File(baseDir+"/target/samoa-0.0.1-SNAPSHOT.jar"); + addEntry(jo,samoaJar,baseDir+"/target/","/app/"); + addLibraries(jo); + + jo.close(); + bo.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + // TODO should get the modules file from the parameters + public static void buildModulesPackage(List<String> modulesNames) { + System.out.println(System.getProperty("user.dir")); + try { + String baseDir = System.getProperty("user.dir"); + List<File> filesArray = new ArrayList<>(); + for (String module : modulesNames) { + module = "/"+module.replace(".", "/")+".class"; + filesArray.add(new File(baseDir+module)); + } + String output = System.getProperty("user.home") + "/modules.jar"; + + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, + "1.0"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, + "http://samoa.yahoo.com"); + manifest.getMainAttributes().put( + Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, + "Yahoo"); + manifest.getMainAttributes().put( + Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); + + BufferedOutputStream bo; + + bo = new BufferedOutputStream(new FileOutputStream(output)); + JarOutputStream jo = new JarOutputStream(bo, manifest); + + File[] files = filesArray.toArray(new File[filesArray.size()]); + addEntries(jo,files, baseDir, ""); + + jo.close(); + bo.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + private static void addLibraries(JarOutputStream jo) { + try { + String baseDir = System.getProperty("user.dir"); + String libDir = baseDir+"/target/lib"; + File inputFile = new File(libDir); + + File[] files = inputFile.listFiles(); + for (File file : files) { + addEntry(jo, file, baseDir, "lib"); + } + jo.close(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static void addEntries(JarOutputStream jo, File[] files, String baseDir, String rootDir){ + for (File file : files) { + + if (!file.isDirectory()) { + addEntry(jo, file, baseDir, rootDir); + } else { + File dir = new File(file.getAbsolutePath()); + addEntries(jo, dir.listFiles(), baseDir, rootDir); + } + } + } + + private static void addEntry(JarOutputStream jo, File file, String baseDir, String rootDir) { + try { + BufferedInputStream bi = new BufferedInputStream(new FileInputStream(file)); + + String path = file.getAbsolutePath().replaceFirst(baseDir, rootDir); + jo.putNextEntry(new ZipEntry(path)); + + byte[] buf = new byte[1024]; + int anz; + while ((anz = bi.read(buf)) != -1) { + jo.write(buf, 0, anz); + } + bi.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static Manifest createManifest() { + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, "http://samoa.yahoo.com"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, "Yahoo"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); + Attributes s4Attributes = new Attributes(); + s4Attributes.putValue("S4-App-Class", "path.to.Class"); + Attributes.Name name = new Attributes.Name("S4-App-Class"); + Attributes.Name S4Version = new Attributes.Name("S4-Version"); + manifest.getMainAttributes().put(name, "samoa.topology.impl.DoTaskApp"); + manifest.getMainAttributes().put(S4Version, "0.6.0-incubating"); + return manifest; + } + + public static Object getInstance(String className) { + Class<?> cls; + Object obj = null; + try { + cls = Class.forName(className); + obj = cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + e.printStackTrace(); + } + return obj; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java b/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java new file mode 100644 index 0000000..f82588b --- /dev/null +++ b/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java @@ -0,0 +1,97 @@ +package com.yahoo.labs.samoa.core; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +import org.junit.Before; +import org.junit.Test; + +public class DoubleVectorTest { + private DoubleVector emptyVector, array5Vector; + + @Before + public void setUp() { + emptyVector = new DoubleVector(); + array5Vector = new DoubleVector(new double[] { 1.1, 2.5, 0, 4.7, 0 }); + } + + @Test + public void testGetArrayRef() { + assertThat(emptyVector.getArrayRef(), notNullValue()); + assertTrue(emptyVector.getArrayRef() == emptyVector.getArrayRef()); + assertEquals(5, array5Vector.getArrayRef().length); + } + + @Test + public void testGetArrayCopy() { + double[] arrayRef; + arrayRef = emptyVector.getArrayRef(); + assertTrue(arrayRef != emptyVector.getArrayCopy()); + assertThat(arrayRef, is(equalTo(emptyVector.getArrayCopy()))); + + arrayRef = array5Vector.getArrayRef(); + assertTrue(arrayRef != array5Vector.getArrayCopy()); + assertThat(arrayRef, is(equalTo(array5Vector.getArrayCopy()))); + } + + @Test + public void testNumNonZeroEntries() { + assertEquals(0, emptyVector.numNonZeroEntries()); + assertEquals(3, array5Vector.numNonZeroEntries()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testGetValueOutOfBound() { + @SuppressWarnings("unused") + double value = emptyVector.getArrayRef()[0]; + } + + @Test() + public void testSetValue() { + // test automatic vector enlargement + emptyVector.setValue(0, 1.0); + assertEquals(1, emptyVector.getArrayRef().length); + assertEquals(1.0, emptyVector.getArrayRef()[0], 0.0); // should be exactly the same, so delta=0.0 + + emptyVector.setValue(5, 5.5); + assertEquals(6, emptyVector.getArrayRef().length); + assertEquals(2, emptyVector.numNonZeroEntries()); + assertEquals(5.5, emptyVector.getArrayRef()[5], 0.0); // should be exactly the same, so delta=0.0 + } + + @Test + public void testAddToValue() { + array5Vector.addToValue(2, 5.0); + assertEquals(5, array5Vector.getArrayRef()[2], 0.0); // should be exactly the same, so delta=0.0 + + // test automatic vector enlargement + emptyVector.addToValue(0, 1.0); + assertEquals(1, emptyVector.getArrayRef()[0], 0.0); // should be exactly the same, so delta=0.0 + } + + @Test + public void testSumOfValues() { + assertEquals(1.1 + 2.5 + 4.7, array5Vector.sumOfValues(), Double.MIN_NORMAL); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java new file mode 100644 index 0000000..04f3184 --- /dev/null +++ b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java @@ -0,0 +1,303 @@ +package com.yahoo.labs.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedWriter; +import java.io.BufferedReader; +import java.io.File; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.util.Iterator; +import java.util.Set; +import java.util.HashSet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSCluster.Builder; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class HDFSFileStreamSourceTest { + + private static final String[] HOSTS = {"localhost"}; + private static final String BASE_DIR = "/minidfsTest"; + private static final int NUM_FILES_IN_DIR = 4; + private static final int NUM_NOISE_FILES_IN_DIR = 2; + + private HDFSFileStreamSource streamSource; + + private Configuration config; + private MiniDFSCluster hdfsCluster; + private String hdfsURI; + + @Before + public void setUp() throws Exception { + // Start MiniDFSCluster + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new Configuration()).hosts(HOSTS).numDataNodes(1).format(true); + hdfsCluster = builder.build(); + hdfsCluster.waitActive(); + hdfsURI = "hdfs://localhost:"+ hdfsCluster.getNameNodePort(); + + // Construct stream source + streamSource = new HDFSFileStreamSource(); + + // General config + config = new Configuration(); + config.set("fs.defaultFS",hdfsURI); + } + + @After + public void tearDown() throws Exception { + hdfsCluster.shutdown(); + } + + /* + * Init tests + */ + @Test + public void testInitWithSingleFileAndExtension() { + // write input file + writeSimpleFiles(BASE_DIR,"txt",1); + + // init with path to input file + streamSource.init(config, BASE_DIR+"/1.txt", "txt"); + + //assertions + assertEquals("Size of filePaths is not correct.", 1,streamSource.getFilePathListSize(),0); + String fn = streamSource.getFilePathAt(0); + assertTrue("Incorrect file in filePaths.",fn.equals(BASE_DIR+"/1.txt") || fn.equals(hdfsURI+BASE_DIR+"1.txt")); + } + + @Test + public void testInitWithSingleFileAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR,"txt",1); + + // init with path to input file + streamSource.init(config, BASE_DIR+"/1.txt", null); + + // assertions + assertEquals("Size of filePaths is not correct.", 1,streamSource.getFilePathListSize(),0); + String fn = streamSource.getFilePathAt(0); + assertTrue("Incorrect file in filePaths.",fn.equals(BASE_DIR+"/1.txt") || fn.equals(hdfsURI+BASE_DIR+"1.txt")); + } + + @Test + public void testInitWithFolderAndExtension() { + // write input files & noise files + writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); + writeSimpleFiles(BASE_DIR,null,NUM_NOISE_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0); + Set<String> filenames = new HashSet<String>(); + for (int i=1; i<=NUM_FILES_IN_DIR; i++) { + String targetFn = BASE_DIR+"/"+Integer.toString(i)+".txt"; + filenames.add(targetFn); + filenames.add(hdfsURI+targetFn); + } + for (int i=0; i<NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:"+fn,filenames.contains(fn)); + } + } + + @Test + public void testInitWithFolderAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR,null,NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, null); + + // assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0); + Set<String> filenames = new HashSet<String>(); + for (int i=1; i<=NUM_FILES_IN_DIR; i++) { + String targetFn = BASE_DIR+"/"+Integer.toString(i); + filenames.add(targetFn); + filenames.add(hdfsURI+targetFn); + } + for (int i=0; i< NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:"+fn,filenames.contains(fn)); + } + } + + /* + * getNextInputStream tests + */ + @Test + public void testGetNextInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // call getNextInputStream & assertions + Set<String> contents = new HashSet<String>(); + for (int i=1; i<=NUM_FILES_IN_DIR; i++) { + contents.add(Integer.toString(i)); + } + for (int i=0; i< NUM_FILES_IN_DIR; i++) { + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.",inStream); + + BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:"+i + ioe.getMessage()); + } + assertTrue("File content is incorrect.",contents.contains(inputRead)); + Iterator<String> it = contents.iterator(); + while (it.hasNext()) { + if (it.next().equals(inputRead)) { + it.remove(); + break; + } + } + } + + // assert that another call to getNextInputStream will return null + assertNull("Call getNextInputStream after the last file did not return null.",streamSource.getNextInputStream()); + } + + /* + * getCurrentInputStream tests + */ + public void testGetCurrentInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // call getNextInputStream, getCurrentInputStream & assertions + for (int i=0; i<= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list + InputStream inStream1 = streamSource.getNextInputStream(); + InputStream inStream2 = streamSource.getCurrentInputStream(); + assertSame("Incorrect current input stream.",inStream1, inStream2); + } + } + + /* + * reset tests + */ + public void testReset() { + // write input files & noise files + writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(config, BASE_DIR, "txt"); + + // Get the first input string + InputStream firstInStream = streamSource.getNextInputStream(); + String firstInput = null; + assertNotNull("Unexpected end of input stream list.",firstInStream); + + BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); + try { + firstInput = rd1.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + + // call getNextInputStream a few times + streamSource.getNextInputStream(); + + // call reset, call next, assert that output is 1 (the first file) + try { + streamSource.reset(); + } catch (IOException ioe) { + fail("Fail resetting stream source." + ioe.getMessage()); + } + + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.",inStream); + + BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd2.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + assertEquals("File content is incorrect.",firstInput,inputRead); + } + + private void writeSimpleFiles(String path, String ext, int numOfFiles) { + // get filesystem + FileSystem dfs; + try { + dfs = hdfsCluster.getFileSystem(); + } catch (IOException ioe) { + fail("Could not access MiniDFSCluster" + ioe.getMessage()); + return; + } + + // create basedir + Path basedir = new Path(path); + try { + dfs.mkdirs(basedir); + } catch (IOException ioe) { + fail("Could not create DIR:"+ path + "\n" + ioe.getMessage()); + return; + } + + // write files + for (int i=1; i<=numOfFiles; i++) { + String fn = null; + if (ext != null) { + fn = Integer.toString(i) + "."+ ext; + } else { + fn = Integer.toString(i); + } + + try { + OutputStream fin = dfs.create(new Path(path,fn)); + BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(fin)); + wr.write(Integer.toString(i)); + wr.close(); + fin.close(); + } catch (IOException ioe) { + fail("Fail writing to input file: "+ fn + " in directory: " + path + ioe.getMessage()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java new file mode 100644 index 0000000..21ca378 --- /dev/null +++ b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java @@ -0,0 +1,276 @@ +package com.yahoo.labs.samoa.streams.fs; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 - 2014 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedWriter; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.lang.SecurityException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; + +import static org.junit.Assert.*; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.apache.commons.io.FileUtils; + +public class LocalFileStreamSourceTest { + private static final String BASE_DIR = "localfsTest"; + private static final int NUM_FILES_IN_DIR = 4; + private static final int NUM_NOISE_FILES_IN_DIR = 2; + + private LocalFileStreamSource streamSource; + + @Before + public void setUp() throws Exception { + streamSource = new LocalFileStreamSource(); + + } + + @After + public void tearDown() throws Exception { + FileUtils.deleteDirectory(new File(BASE_DIR)); + } + + @Test + public void testInitWithSingleFileAndExtension() { + // write input file + writeSimpleFiles(BASE_DIR,"txt",1); + + // init with path to input file + File inFile = new File(BASE_DIR,"1.txt"); + String inFilePath = inFile.getAbsolutePath(); + streamSource.init(inFilePath, "txt"); + + //assertions + assertEquals("Size of filePaths is not correct.", 1,streamSource.getFilePathListSize(),0); + String fn = streamSource.getFilePathAt(0); + assertEquals("Incorrect file in filePaths.",inFilePath,fn); + } + + @Test + public void testInitWithSingleFileAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR,"txt",1); + + // init with path to input file + File inFile = new File(BASE_DIR,"1.txt"); + String inFilePath = inFile.getAbsolutePath(); + streamSource.init(inFilePath, null); + + //assertions + assertEquals("Size of filePaths is not correct.", 1,streamSource.getFilePathListSize(),0); + String fn = streamSource.getFilePathAt(0); + assertEquals("Incorrect file in filePaths.",inFilePath,fn); + } + + @Test + public void testInitWithFolderAndExtension() { + // write input file + writeSimpleFiles(BASE_DIR,null,NUM_NOISE_FILES_IN_DIR); + writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); + + // init with path to input dir + File inDir = new File(BASE_DIR); + String inDirPath = inDir.getAbsolutePath(); + streamSource.init(inDirPath, "txt"); + + //assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0); + Set<String> filenames = new HashSet<String>(); + for (int i=1; i<=NUM_FILES_IN_DIR; i++) { + String expectedFn = (new File(inDirPath,Integer.toString(i)+".txt")).getAbsolutePath(); + filenames.add(expectedFn); + } + for (int i=0; i< NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:"+fn,filenames.contains(fn)); + } + } + + @Test + public void testInitWithFolderAndNullExtension() { + // write input file + writeSimpleFiles(BASE_DIR,null,NUM_FILES_IN_DIR); + + // init with path to input dir + File inDir = new File(BASE_DIR); + String inDirPath = inDir.getAbsolutePath(); + streamSource.init(inDirPath, null); + + //assertions + assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR,streamSource.getFilePathListSize(),0); + Set<String> filenames = new HashSet<String>(); + for (int i=1; i<=NUM_FILES_IN_DIR; i++) { + String expectedFn = (new File(inDirPath,Integer.toString(i))).getAbsolutePath(); + filenames.add(expectedFn); + } + for (int i=0; i< NUM_FILES_IN_DIR; i++) { + String fn = streamSource.getFilePathAt(i); + assertTrue("Incorrect file in filePaths:"+fn,filenames.contains(fn)); + } + } + + /* + * getNextInputStream tests + */ + @Test + public void testGetNextInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(BASE_DIR, "txt"); + + // call getNextInputStream & assertions + Set<String> contents = new HashSet<String>(); + for (int i=1; i<=NUM_FILES_IN_DIR; i++) { + contents.add(Integer.toString(i)); + } + for (int i=0; i< NUM_FILES_IN_DIR; i++) { + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.",inStream); + + BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:"+i + ioe.getMessage()); + } + assertTrue("File content is incorrect.",contents.contains(inputRead)); + Iterator<String> it = contents.iterator(); + while (it.hasNext()) { + if (it.next().equals(inputRead)) { + it.remove(); + break; + } + } + } + + // assert that another call to getNextInputStream will return null + assertNull("Call getNextInputStream after the last file did not return null.",streamSource.getNextInputStream()); + } + + /* + * getCurrentInputStream tests + */ + public void testGetCurrentInputStream() { + // write input files & noise files + writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(BASE_DIR, "txt"); + + // call getNextInputStream, getCurrentInputStream & assertions + for (int i=0; i<= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list + InputStream inStream1 = streamSource.getNextInputStream(); + InputStream inStream2 = streamSource.getCurrentInputStream(); + assertSame("Incorrect current input stream.",inStream1, inStream2); + } + } + + /* + * reset tests + */ + public void testReset() { + // write input files & noise files + writeSimpleFiles(BASE_DIR,"txt",NUM_FILES_IN_DIR); + + // init with path to input dir + streamSource.init(BASE_DIR, "txt"); + + // Get the first input string + InputStream firstInStream = streamSource.getNextInputStream(); + String firstInput = null; + assertNotNull("Unexpected end of input stream list.",firstInStream); + + BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); + try { + firstInput = rd1.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + + // call getNextInputStream a few times + streamSource.getNextInputStream(); + + // call reset, call next, assert that output is 1 (the first file) + try { + streamSource.reset(); + } catch (IOException ioe) { + fail("Fail resetting stream source." + ioe.getMessage()); + } + + InputStream inStream = streamSource.getNextInputStream(); + assertNotNull("Unexpected end of input stream list.",inStream); + + BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); + String inputRead = null; + try { + inputRead = rd2.readLine(); + } catch (IOException ioe) { + fail("Fail reading from stream at index:0" + ioe.getMessage()); + } + assertEquals("File content is incorrect.",firstInput,inputRead); + } + + private void writeSimpleFiles(String path, String ext, int numOfFiles) { + // Create folder + File folder = new File(path); + if (!folder.exists()) { + try{ + folder.mkdir(); + } catch(SecurityException se){ + fail("Failed creating directory:"+path+se); + } + } + + // Write files + for (int i=1; i<=numOfFiles; i++) { + String fn = null; + if (ext != null) { + fn = Integer.toString(i) + "."+ ext; + } else { + fn = Integer.toString(i); + } + + try { + FileWriter fwr = new FileWriter(new File(path,fn)); + fwr.write(Integer.toString(i)); + fwr.close(); + } catch (IOException ioe) { + fail("Fail writing to input file: "+ fn + " in directory: " + path + ioe.getMessage()); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/pom.xml ---------------------------------------------------------------------- diff --git a/samoa-instances/pom.xml b/samoa-instances/pom.xml new file mode 100644 index 0000000..98d2ebf --- /dev/null +++ b/samoa-instances/pom.xml @@ -0,0 +1,35 @@ +<!-- + #%L + SAMOA + %% + Copyright (C) 2013 Yahoo! Inc. + %% + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + #L% + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + </properties> + + <name>samoa-instances</name> + <description>Instances for SAMOA</description> + + <artifactId>samoa-instances</artifactId> + <parent> + <groupId>com.yahoo.labs.samoa</groupId> + <artifactId>samoa</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> +</project> http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/ArffLoader.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/ArffLoader.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/ArffLoader.java new file mode 100644 index 0000000..9476ec0 --- /dev/null +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/ArffLoader.java @@ -0,0 +1,364 @@ +package com.yahoo.labs.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.io.BufferedReader; +import java.io.IOException; +import java.io.Reader; +import java.io.Serializable; +import java.io.StreamTokenizer; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * + * @author abifet + */ +public class ArffLoader implements Serializable { + + protected InstanceInformation instanceInformation; + + transient protected StreamTokenizer streamTokenizer; + + protected Reader reader; + + protected int size; + + protected int classAttribute; + + public ArffLoader() { + } + + public ArffLoader(Reader reader, int size, int classAttribute) { + this.reader = reader; + this.size = size; + this.classAttribute = classAttribute; + initStreamTokenizer(reader); + } + + public InstanceInformation getStructure() { + return this.instanceInformation; + } + + public Instance readInstance(Reader reader) { + if (streamTokenizer == null) { + initStreamTokenizer(reader); + } + while (streamTokenizer.ttype == StreamTokenizer.TT_EOL) { + try { + streamTokenizer.nextToken(); + } catch (IOException ex) { + Logger.getLogger(ArffLoader.class.getName()).log(Level.SEVERE, null, ex); + } + } + if (streamTokenizer.ttype == '{') { + return readInstanceSparse(); + // return readDenseInstanceSparse(); + } else { + return readInstanceDense(); + } + + } + + public Instance readInstanceDense() { + Instance instance = new DenseInstance(this.instanceInformation.numAttributes() + 1); + //System.out.println(this.instanceInformation.numAttributes()); + int numAttribute = 0; + try { + while (numAttribute == 0 && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + //For each line + while (streamTokenizer.ttype != StreamTokenizer.TT_EOL + && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + //For each item + if (streamTokenizer.ttype == StreamTokenizer.TT_NUMBER) { + //System.out.println(streamTokenizer.nval + "Num "); + this.setValue(instance, numAttribute, streamTokenizer.nval, true); + numAttribute++; + + } else if (streamTokenizer.sval != null && (streamTokenizer.ttype == StreamTokenizer.TT_WORD + || streamTokenizer.ttype == 34)) { + //System.out.println(streamTokenizer.sval + "Str"); + boolean isNumeric = attributes.get(numAttribute).isNumeric(); + double value; + if ("?".equals(streamTokenizer.sval)) { + value = Double.NaN; //Utils.missingValue(); + } else if (isNumeric == true) { + value = Double.valueOf(streamTokenizer.sval).doubleValue(); + } else { + value = this.instanceInformation.attribute(numAttribute).indexOfValue(streamTokenizer.sval); + } + + this.setValue(instance, numAttribute, value, isNumeric); + numAttribute++; + } + streamTokenizer.nextToken(); + } + streamTokenizer.nextToken(); + //System.out.println("EOL"); + } + + + } catch (IOException ex) { + Logger.getLogger(ArffLoader.class.getName()).log(Level.SEVERE, null, ex); + } + return (numAttribute > 0) ? instance : null; + } + + private void setValue(Instance instance, int numAttribute, double value, boolean isNumber) { + double valueAttribute; + if (isNumber && this.instanceInformation.attribute(numAttribute).isNominal) { + valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(Double.toString(value)); + //System.out.println(value +"/"+valueAttribute+" "); + + } else { + valueAttribute = value; + //System.out.println(value +"/"+valueAttribute+" "); + } + if (this.instanceInformation.classIndex() == numAttribute) { + instance.setClassValue(valueAttribute); + //System.out.println(value +"<"+this.instanceInformation.classIndex()+">"); + } else { + instance.setValue(numAttribute, valueAttribute); + } + } + + private Instance readInstanceSparse() { + //Return a Sparse Instance + Instance instance = new SparseInstance(1.0, null); //(this.instanceInformation.numAttributes() + 1); + //System.out.println(this.instanceInformation.numAttributes()); + int numAttribute; + ArrayList<Double> attributeValues = new ArrayList<Double>(); + List<Integer> indexValues = new ArrayList<Integer>(); + try { + //while (streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + streamTokenizer.nextToken(); // Remove the '{' char + //For each line + while (streamTokenizer.ttype != StreamTokenizer.TT_EOL + && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + while (streamTokenizer.ttype != '}') { + //For each item + //streamTokenizer.nextToken(); + //while (streamTokenizer.ttype != '}'){ + //System.out.println(streamTokenizer.nval +"-"+ streamTokenizer.sval); + //numAttribute = (int) streamTokenizer.nval; + if (streamTokenizer.ttype == StreamTokenizer.TT_NUMBER) { + numAttribute = (int) streamTokenizer.nval; + } else { + numAttribute = Integer.parseInt(streamTokenizer.sval); + } + streamTokenizer.nextToken(); + + if (streamTokenizer.ttype == StreamTokenizer.TT_NUMBER) { + //System.out.print(streamTokenizer.nval + " "); + this.setSparseValue(instance, indexValues, attributeValues, numAttribute, streamTokenizer.nval, true); + //numAttribute++; + + } else if (streamTokenizer.sval != null && (streamTokenizer.ttype == StreamTokenizer.TT_WORD + || streamTokenizer.ttype == 34)) { + //System.out.print(streamTokenizer.sval + "-"); + if (attributes.get(numAttribute).isNumeric()) { + this.setSparseValue(instance, indexValues, attributeValues, numAttribute, Double.valueOf(streamTokenizer.sval).doubleValue(), true); + } else { + this.setSparseValue(instance, indexValues, attributeValues, numAttribute, this.instanceInformation.attribute(numAttribute).indexOfValue(streamTokenizer.sval), false); + } + } + streamTokenizer.nextToken(); + } + streamTokenizer.nextToken(); //Remove the '}' char + } + streamTokenizer.nextToken(); + //System.out.println("EOL"); + //} + + + } catch (IOException ex) { + Logger.getLogger(ArffLoader.class.getName()).log(Level.SEVERE, null, ex); + } + int[] arrayIndexValues = new int[attributeValues.size()]; + double[] arrayAttributeValues = new double[attributeValues.size()]; + for (int i = 0; i < arrayIndexValues.length; i++) { + arrayIndexValues[i] = indexValues.get(i).intValue(); + arrayAttributeValues[i] = attributeValues.get(i).doubleValue(); + } + instance.addSparseValues(arrayIndexValues, arrayAttributeValues, this.instanceInformation.numAttributes()); + return instance; + + } + + private void setSparseValue(Instance instance, List<Integer> indexValues, List<Double> attributeValues, int numAttribute, double value, boolean isNumber) { + double valueAttribute; + if (isNumber && this.instanceInformation.attribute(numAttribute).isNominal) { + valueAttribute = this.instanceInformation.attribute(numAttribute).indexOfValue(Double.toString(value)); + } else { + valueAttribute = value; + } + if (this.instanceInformation.classIndex() == numAttribute) { + instance.setClassValue(valueAttribute); + } else { + //instance.setValue(numAttribute, valueAttribute); + indexValues.add(numAttribute); + attributeValues.add(valueAttribute); + } + //System.out.println(numAttribute+":"+valueAttribute+","+this.instanceInformation.classIndex()+","+value); + } + + private Instance readDenseInstanceSparse() { + //Returns a dense instance + Instance instance = new DenseInstance(this.instanceInformation.numAttributes() + 1); + //System.out.println(this.instanceInformation.numAttributes()); + int numAttribute; + try { + //while (streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + streamTokenizer.nextToken(); // Remove the '{' char + //For each line + while (streamTokenizer.ttype != StreamTokenizer.TT_EOL + && streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + while (streamTokenizer.ttype != '}') { + //For each item + //streamTokenizer.nextToken(); + //while (streamTokenizer.ttype != '}'){ + //System.out.print(streamTokenizer.nval+":"); + numAttribute = (int) streamTokenizer.nval; + streamTokenizer.nextToken(); + + if (streamTokenizer.ttype == StreamTokenizer.TT_NUMBER) { + //System.out.print(streamTokenizer.nval + " "); + this.setValue(instance, numAttribute, streamTokenizer.nval, true); + //numAttribute++; + + } else if (streamTokenizer.sval != null && (streamTokenizer.ttype == StreamTokenizer.TT_WORD + || streamTokenizer.ttype == 34)) { + //System.out.print(streamTokenizer.sval + "/"+this.instanceInformation.attribute(numAttribute).indexOfValue(streamTokenizer.sval)+" "); + if (attributes.get(numAttribute).isNumeric()) { + this.setValue(instance, numAttribute, Double.valueOf(streamTokenizer.sval).doubleValue(), true); + } else { + this.setValue(instance, numAttribute, this.instanceInformation.attribute(numAttribute).indexOfValue(streamTokenizer.sval), false); + //numAttribute++; + } + } + streamTokenizer.nextToken(); + } + streamTokenizer.nextToken(); //Remove the '}' char + } + streamTokenizer.nextToken(); + //System.out.println("EOL"); + //} + + + } catch (IOException ex) { + Logger.getLogger(ArffLoader.class.getName()).log(Level.SEVERE, null, ex); + } + return instance; + } + + protected List<Attribute> attributes; + + private InstanceInformation getHeader() { + + String relation = "file stream"; + //System.out.println("RELATION " + relation); + attributes = new ArrayList<Attribute>(); + try { + streamTokenizer.nextToken(); + while (streamTokenizer.ttype != StreamTokenizer.TT_EOF) { + //For each line + //if (streamTokenizer.ttype == '@') { + if (streamTokenizer.ttype == StreamTokenizer.TT_WORD && streamTokenizer.sval.startsWith("@") == true) { + //streamTokenizer.nextToken(); + String token = streamTokenizer.sval.toUpperCase(); + if (token.startsWith("@RELATION")) { + streamTokenizer.nextToken(); + relation = streamTokenizer.sval; + //System.out.println("RELATION " + relation); + } else if (token.startsWith("@ATTRIBUTE")) { + streamTokenizer.nextToken(); + String name = streamTokenizer.sval; + //System.out.println("* " + name); + if (name == null) { + name = Double.toString(streamTokenizer.nval); + } + streamTokenizer.nextToken(); + String type = streamTokenizer.sval; + //System.out.println("* " + name + ":" + type + " "); + if (streamTokenizer.ttype == '{') { + streamTokenizer.nextToken(); + List<String> attributeLabels = new ArrayList<String>(); + while (streamTokenizer.ttype != '}') { + + if (streamTokenizer.sval != null) { + attributeLabels.add(streamTokenizer.sval); + //System.out.print(streamTokenizer.sval + ","); + } else { + attributeLabels.add(Double.toString(streamTokenizer.nval)); + //System.out.print(streamTokenizer.nval + ","); + } + + streamTokenizer.nextToken(); + } + //System.out.println(); + attributes.add(new Attribute(name, attributeLabels)); + } else { + // Add attribute + attributes.add(new Attribute(name)); + } + + } else if (token.startsWith("@DATA")) { + //System.out.print("END"); + streamTokenizer.nextToken(); + break; + } + } + streamTokenizer.nextToken(); + } + + } catch (IOException ex) { + Logger.getLogger(ArffLoader.class.getName()).log(Level.SEVERE, null, ex); + } + return new InstanceInformation(relation, attributes); + } + + private void initStreamTokenizer(Reader reader) { + BufferedReader br = new BufferedReader(reader); + + //Init streamTokenizer + streamTokenizer = new StreamTokenizer(br); + + streamTokenizer.resetSyntax(); + streamTokenizer.whitespaceChars(0, ' '); + streamTokenizer.wordChars(' ' + 1, '\u00FF'); + streamTokenizer.whitespaceChars(',', ','); + streamTokenizer.commentChar('%'); + streamTokenizer.quoteChar('"'); + streamTokenizer.quoteChar('\''); + streamTokenizer.ordinaryChar('{'); + streamTokenizer.ordinaryChar('}'); + streamTokenizer.eolIsSignificant(true); + + this.instanceInformation = this.getHeader(); + if (classAttribute < 0) { + this.instanceInformation.setClassIndex(this.instanceInformation.numAttributes() - 1); + //System.out.print(this.instanceInformation.classIndex()); + } else if (classAttribute > 0) { + this.instanceInformation.setClassIndex(classAttribute - 1); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Attribute.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Attribute.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Attribute.java new file mode 100644 index 0000000..8f3873c --- /dev/null +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Attribute.java @@ -0,0 +1,205 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package com.yahoo.labs.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * + * @author abifet + */ +public class Attribute implements Serializable{ + + + public static final String ARFF_ATTRIBUTE = "@attribute"; + public static final String ARFF_ATTRIBUTE_NUMERIC = "NUMERIC"; + + + /** + * + */ + protected boolean isNominal; + /** + * + */ + protected boolean isNumeric; + /** + * + */ + protected boolean isDate; + /** + * + */ + protected String name; + /** + * + */ + protected List<String> attributeValues; + + /** + * + * @return + */ + public List<String> getAttributeValues() { + return attributeValues; + } + /** + * + */ + protected int index; + + /** + * + * @param string + */ + public Attribute(String string) { + this.name = string; + this.isNumeric = true; + } + + /** + * + * @param attributeName + * @param attributeValues + */ + public Attribute(String attributeName, List<String> attributeValues) { + this.name = attributeName; + this.attributeValues = attributeValues; + this.isNominal = true; + } + + /** + * + */ + public Attribute() { + this(""); + } + + /** + * + * @return + */ + public boolean isNominal() { + return this.isNominal; + } + + /** + * + * @return + */ + public String name() { + return this.name; + } + + /** + * + * @param value + * @return + */ + public String value(int value) { + return attributeValues.get(value); + } + + /** + * + * @return + */ + public boolean isNumeric() { + return isNumeric; + } + + /** + * + * @return + */ + public int numValues() { + if (isNumeric()) { + return 0; + } + else { + return attributeValues.size(); + } + } + + /** + * + * @return + */ + public int index() { //RuleClassifier + return this.index; + } + + String formatDate(double value) { + SimpleDateFormat sdf = new SimpleDateFormat(); + return sdf.format(new Date((long) value)); + } + + boolean isDate() { + return isDate; + } + private Map<String, Integer> valuesStringAttribute; + + /** + * + * @param value + * @return + */ + public final int indexOfValue(String value) { + + if (isNominal() == false) { + return -1; + } + if (this.valuesStringAttribute == null) { + this.valuesStringAttribute = new HashMap<String, Integer>(); + int count = 0; + for (String stringValue : attributeValues) { + this.valuesStringAttribute.put(stringValue, count); + count++; + } + } + Integer val = (Integer) this.valuesStringAttribute.get(value); + if (val == null) { + return -1; + } else { + return val.intValue(); + } + } + + @Override + public String toString() { + StringBuffer text = new StringBuffer(); + + text.append(ARFF_ATTRIBUTE).append(" ").append(Utils.quote(this.name)).append(" "); + + text.append(ARFF_ATTRIBUTE_NUMERIC); + + return text.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/DenseInstance.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/DenseInstance.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/DenseInstance.java new file mode 100644 index 0000000..b63f736 --- /dev/null +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/DenseInstance.java @@ -0,0 +1,72 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package com.yahoo.labs.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * + * @author abifet + */ +public class DenseInstance extends SingleLabelInstance { + + private static final long serialVersionUID = 280360594027716737L; + + public DenseInstance() { + // necessary for kryo serializer + } + + public DenseInstance(double weight, double[] res) { + super(weight,res); + } + public DenseInstance(SingleLabelInstance inst) { + super(inst); + } + + public DenseInstance(Instance inst) { + super((SingleLabelInstance) inst); + } + public DenseInstance(double numberAttributes) { + super((int) numberAttributes); + //super(1, new double[(int) numberAttributes-1]); + //Add missing values + //for (int i = 0; i < numberAttributes-1; i++) { + // //this.setValue(i, Double.NaN); + //} + + } + + @Override + public String toString() { + StringBuffer text = new StringBuffer(); + + for (int i = 0; i < this.instanceInformation.numAttributes(); i++) { + if (i > 0) + text.append(","); + text.append(this.value(i)); + } + text.append(",").append(this.weight()); + + return text.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/DenseInstanceData.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/DenseInstanceData.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/DenseInstanceData.java new file mode 100644 index 0000000..e83a4d9 --- /dev/null +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/DenseInstanceData.java @@ -0,0 +1,97 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package com.yahoo.labs.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * + * @author abifet + */ +public class DenseInstanceData implements InstanceData{ + + public DenseInstanceData(double[] array) { + this.attributeValues = array; + } + + public DenseInstanceData(int length) { + this.attributeValues = new double[length]; + } + + public DenseInstanceData() { + this(0); + } + + protected double[] attributeValues; + + @Override + public int numAttributes() { + return this.attributeValues.length; + } + + @Override + public double value(int indexAttribute) { + return this.attributeValues[indexAttribute]; + } + + @Override + public boolean isMissing(int indexAttribute) { + return Double.isNaN(this.value(indexAttribute)); + } + + @Override + public int numValues() { + return numAttributes(); + } + + @Override + public int index(int indexAttribute) { + return indexAttribute; + } + + @Override + public double valueSparse(int indexAttribute) { + return value(indexAttribute); + } + + @Override + public boolean isMissingSparse(int indexAttribute) { + return isMissing(indexAttribute); + } + + /*@Override + public double value(Attribute attribute) { + return value(attribute.index()); + }*/ + + @Override + public double[] toDoubleArray() { + return attributeValues.clone(); + } + + @Override + public void setValue(int attributeIndex, double d) { + this.attributeValues[attributeIndex] = d; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Instance.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Instance.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Instance.java new file mode 100644 index 0000000..7d8e337 --- /dev/null +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Instance.java @@ -0,0 +1,74 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package com.yahoo.labs.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; + +/** + * + * @author abifet + */ + +public interface Instance extends Serializable{ + + double weight(); + void setWeight(double weight); + + //Attributes + Attribute attribute(int instAttIndex); + void deleteAttributeAt(int i); + void insertAttributeAt(int i); + int numAttributes(); + public void addSparseValues(int[] indexValues, double[] attributeValues, int numberAttributes); + + + //Values + int numValues(); + String stringValue(int i); + double value(int instAttIndex); + double value(Attribute attribute); + void setValue(int m_numAttributes, double d); + boolean isMissing(int instAttIndex); + int index(int i); + double valueSparse(int i); + boolean isMissingSparse(int p1); + double[] toDoubleArray(); + + //Class + Attribute classAttribute(); + int classIndex(); + boolean classIsMissing(); + double classValue(); + int numClasses(); + void setClassValue(double d); + + Instance copy(); + + //Dataset + void setDataset(Instances dataset); + Instances dataset(); + String toString(); +} + http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstanceData.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstanceData.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstanceData.java new file mode 100644 index 0000000..e8492a9 --- /dev/null +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstanceData.java @@ -0,0 +1,55 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package com.yahoo.labs.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; + +/** + * + * @author abifet + */ +public interface InstanceData extends Serializable{ + + public int numAttributes(); + + public double value(int instAttIndex); + + public boolean isMissing(int instAttIndex); + + public int numValues(); + + public int index(int i); + + public double valueSparse(int i); + + public boolean isMissingSparse(int p1); + + //public double value(Attribute attribute); + + public double[] toDoubleArray(); + + public void setValue(int m_numAttributes, double d); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstanceInformation.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstanceInformation.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstanceInformation.java new file mode 100644 index 0000000..ff22762 --- /dev/null +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/InstanceInformation.java @@ -0,0 +1,112 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package com.yahoo.labs.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Serializable; +import java.util.List; + +/** + * + * @author abifet + */ +public class InstanceInformation implements Serializable{ + + //Should we split Instances as a List of Instances, and InformationInstances + + /** The dataset's name. */ + protected String relationName; + + /** The attribute information. */ + protected List<Attribute> attributes; + + protected int classIndex; + + + + public InstanceInformation(InstanceInformation chunk) { + this.relationName = chunk.relationName; + this.attributes = chunk.attributes; + this.classIndex = chunk.classIndex; + } + + public InstanceInformation(String st, List<Attribute> v) { + this.relationName = st; + this.attributes = v; + } + + public InstanceInformation() { + this.relationName = null; + this.attributes = null; + } + + + //Information Instances + + public void setRelationName(String string) { + this.relationName = string; + } + + public String getRelationName() { + return this.relationName; + } + + public int classIndex() { + return classIndex; + } + + public void setClassIndex(int classIndex) { + this.classIndex = classIndex; + } + + public Attribute classAttribute() { + return this.attribute(this.classIndex()); + } + + public int numAttributes() { + return this.attributes.size(); + } + + public Attribute attribute(int w) { + return this.attributes.get(w); + } + + public int numClasses() { + return this.attributes.get(this.classIndex()).numValues(); + } + + public void deleteAttributeAt(Integer integer) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public void insertAttributeAt(Attribute attribute, int i) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public void setAttributes(List<Attribute> v) { + this.attributes = v; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/787864b6/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Instances.java ---------------------------------------------------------------------- diff --git a/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Instances.java b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Instances.java new file mode 100644 index 0000000..85b52ec --- /dev/null +++ b/samoa-instances/src/main/java/com/yahoo/labs/samoa/instances/Instances.java @@ -0,0 +1,246 @@ +/* + * To change this template, choose Tools | Templates + * and open the template in the editor. + */ +package com.yahoo.labs.samoa.instances; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2013 Yahoo! Inc. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.Reader; +import java.io.Serializable; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +/** + * + * @author abifet + */ +public class Instances implements Serializable { + + public static final String ARFF_RELATION = "@relation"; + public static final String ARFF_DATA = "@data"; + + + protected InstanceInformation instanceInformation; + /** + * The instances. + */ + protected List<Instance> instances; + + transient protected ArffLoader arff; + + protected int classAttribute; + + public Instances(InstancesHeader modelContext) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public Instances(Instances chunk) { + this.instanceInformation = chunk.instanceInformation(); + // this.relationName = chunk.relationName; + // this.attributes = chunk.attributes; + this.instances = chunk.instances; + } + + public Instances() { + // this.instanceInformation = chunk.instanceInformation(); + // this.relationName = chunk.relationName; + // this.attributes = chunk.attributes; + // this.instances = chunk.instances; + } + + public Instances(Reader reader, int size, int classAttribute) { + this.classAttribute = classAttribute; + arff = new ArffLoader(reader, 0, classAttribute); + this.instanceInformation = arff.getStructure(); + this.instances = new ArrayList<>(); + } + + public Instances(Instances chunk, int capacity) { + this(chunk); + } + + public Instances(String st, List<Attribute> v, int capacity) { + + this.instanceInformation = new InstanceInformation(st, v); + this.instances = new ArrayList<>(); + } + + public Instances(Instances chunk, int i, int j) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public Instances(StringReader st, int v) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + // Information Instances + public void setRelationName(String string) { + this.instanceInformation.setRelationName(string); + } + + public String getRelationName() { + return this.instanceInformation.getRelationName(); + } + + public int classIndex() { + return this.instanceInformation.classIndex(); + } + + public void setClassIndex(int classIndex) { + this.instanceInformation.setClassIndex(classIndex); + } + + public Attribute classAttribute() { + return this.instanceInformation.classAttribute(); + } + + public int numAttributes() { + return this.instanceInformation.numAttributes(); + } + + public Attribute attribute(int w) { + return this.instanceInformation.attribute(w); + } + + public int numClasses() { + return this.instanceInformation.numClasses(); + } + + public void deleteAttributeAt(Integer integer) { + this.instanceInformation.deleteAttributeAt(integer); + } + + public void insertAttributeAt(Attribute attribute, int i) { + this.instanceInformation.insertAttributeAt(attribute, i); + } + + // List of Instances + public Instance instance(int num) { + return this.instances.get(num); + } + + public int numInstances() { + return this.instances.size(); + } + + public void add(Instance inst) { + this.instances.add(inst.copy()); + } + + public void randomize(Random random) { + for (int j = numInstances() - 1; j > 0; j--) { + swap(j, random.nextInt(j + 1)); + } + } + + public void stratify(int numFolds) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public Instances trainCV(int numFolds, int n, Random random) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + public Instances testCV(int numFolds, int n) { + throw new UnsupportedOperationException("Not yet implemented"); + } + + /* + * public Instances dataset() { throw new + * UnsupportedOperationException("Not yet implemented"); } + */ + public double meanOrMode(int j) { + throw new UnsupportedOperationException("Not yet implemented"); // CobWeb + } + + public boolean readInstance(Reader fileReader) { + + // ArffReader arff = new ArffReader(reader, this, m_Lines, 1); + if (arff == null) { + arff = new ArffLoader(fileReader,0,this.classAttribute); + } + Instance inst = arff.readInstance(fileReader); + if (inst != null) { + inst.setDataset(this); + add(inst); + return true; + } else { + return false; + } + } + + public void delete() { + this.instances = new ArrayList<>(); + } + + public void swap(int i, int j) { + Instance in = instances.get(i); + instances.set(i, instances.get(j)); + instances.set(j, in); + } + + private InstanceInformation instanceInformation() { + return this.instanceInformation; + } + + public Attribute attribute(String name) { + + for (int i = 0; i < numAttributes(); i++) { + if (attribute(i).name().equals(name)) { + return attribute(i); + } + } + return null; + } + + + @Override + public String toString() { + StringBuilder text = new StringBuilder(); + + for (int i = 0; i < numInstances(); i++) { + text.append(instance(i).toString()); + if (i < numInstances() - 1) { + text.append('\n'); + } + } + return text.toString(); + } + + // toString() with header + public String toStringArff() { + StringBuilder text = new StringBuilder(); + + text.append(ARFF_RELATION).append(" ") + .append(Utils.quote(getRelationName())).append("\n\n"); + for (int i = 0; i < numAttributes(); i++) { + text.append(attribute(i).toString()).append("\n"); + } + text.append("\n").append(ARFF_DATA).append("\n"); + + text.append(toString()); + return text.toString(); + + } +}
