http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/AbstractEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/AbstractEntranceProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractEntranceProcessingItem.java new file mode 100644 index 0000000..d1d22ba --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractEntranceProcessingItem.java @@ -0,0 +1,115 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; + +/** + * Helper class for EntranceProcessingItem implementation. + * + * @author Anh Thu Vu + * + */ +public abstract class AbstractEntranceProcessingItem implements EntranceProcessingItem { + private EntranceProcessor processor; + private String name; + private Stream outputStream; + + /* + * Constructor + */ + public AbstractEntranceProcessingItem() { + this(null); + } + + public AbstractEntranceProcessingItem(EntranceProcessor processor) { + this.processor = processor; + } + + /* + * Processor + */ + /** + * Set the entrance processor for this EntranceProcessingItem + * + * @param processor + * the processor + */ + protected void setProcessor(EntranceProcessor processor) { + this.processor = processor; + } + + /** + * Get the EntranceProcessor of this EntranceProcessingItem. + * + * @return the EntranceProcessor + */ + public EntranceProcessor getProcessor() { + return this.processor; + } + + /* + * Name/ID + */ + /** + * Set the name (or ID) of this EntranceProcessingItem + * + * @param name + */ + public void setName(String name) { + this.name = name; + } + + /** + * Get the name (or ID) of this EntranceProcessingItem + * + * @return the name (or ID) + */ + public String getName() { + return this.name; + } + + /* + * Output Stream + */ + /** + * Set the output stream of this EntranceProcessingItem. An EntranceProcessingItem should have only 1 single output + * stream and should not be re-assigned. + * + * @return this EntranceProcessingItem + */ + public EntranceProcessingItem setOutputStream(Stream outputStream) { + if (this.outputStream != null && this.outputStream != outputStream) { + throw new IllegalStateException("Cannot overwrite output stream of EntranceProcessingItem"); + } else + this.outputStream = outputStream; + return this; + } + + /** + * Get the output stream of this EntranceProcessingItem. + * + * @return the output stream + */ + public Stream getOutputStream() { + return this.outputStream; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/AbstractProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/AbstractProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractProcessingItem.java new file mode 100644 index 0000000..112d76e --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractProcessingItem.java @@ -0,0 +1,168 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.Processor; +import org.apache.samoa.utils.PartitioningScheme; + +/** + * Abstract ProcessingItem + * + * Helper for implementation of ProcessingItem. It has basic information for a ProcessingItem: name, parallelismLevel + * and a processor. Subclass of this class needs to implement {@link #addInputStream(Stream, PartitioningScheme)}. + * + * @author Anh Thu Vu + * + */ +public abstract class AbstractProcessingItem implements ProcessingItem { + private String name; + private int parallelism; + private Processor processor; + + /* + * Constructor + */ + public AbstractProcessingItem() { + this(null); + } + + public AbstractProcessingItem(Processor processor) { + this(processor, 1); + } + + public AbstractProcessingItem(Processor processor, int parallelism) { + this.processor = processor; + this.parallelism = parallelism; + } + + /* + * Processor + */ + /** + * Set the processor for this ProcessingItem + * + * @param processor + * the processor + */ + protected void setProcessor(Processor processor) { + this.processor = processor; + } + + /** + * Get the processor of this ProcessingItem + * + * @return the processor + */ + public Processor getProcessor() { + return this.processor; + } + + /* + * Parallelism + */ + /** + * Set the parallelism factor of this ProcessingItem + * + * @param parallelism + */ + protected void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + /** + * Get the parallelism factor of this ProcessingItem + * + * @return the parallelism factor + */ + @Override + public int getParallelism() { + return this.parallelism; + } + + /* + * Name/ID + */ + /** + * Set the name (or ID) of this ProcessingItem + * + * @param name + * the name/ID + */ + public void setName(String name) { + this.name = name; + } + + /** + * Get the name (or ID) of this ProcessingItem + * + * @return the name/ID + */ + public String getName() { + return this.name; + } + + /* + * Add input streams + */ + /** + * Add an input stream to this ProcessingItem + * + * @param inputStream + * the input stream to add + * @param scheme + * partitioning scheme associated with this ProcessingItem and the input stream + * @return this ProcessingItem + */ + protected abstract ProcessingItem addInputStream(Stream inputStream, PartitioningScheme scheme); + + /** + * Add an input stream to this ProcessingItem with SHUFFLE scheme + * + * @param inputStream + * the input stream + * @return this ProcessingItem + */ + public ProcessingItem connectInputShuffleStream(Stream inputStream) { + return this.addInputStream(inputStream, PartitioningScheme.SHUFFLE); + } + + /** + * Add an input stream to this ProcessingItem with GROUP_BY_KEY scheme + * + * @param inputStream + * the input stream + * @return this ProcessingItem + */ + public ProcessingItem connectInputKeyStream(Stream inputStream) { + return this.addInputStream(inputStream, PartitioningScheme.GROUP_BY_KEY); + } + + /** + * Add an input stream to this ProcessingItem with BROADCAST scheme + * + * @param inputStream + * the input stream + * @return this ProcessingItem + */ + public ProcessingItem connectInputAllStream(Stream inputStream) { + return this.addInputStream(inputStream, PartitioningScheme.BROADCAST); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/AbstractStream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/AbstractStream.java b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractStream.java new file mode 100644 index 0000000..b79cc61 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractStream.java @@ -0,0 +1,118 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; + +/** + * Abstract Stream + * + * Helper for implementation of Stream. It has basic information for a Stream: streamID and source ProcessingItem. + * Subclass of this class needs to implement {@link #put(ContentEvent)}. + * + * @author Anh Thu Vu + * + */ + +public abstract class AbstractStream implements Stream { + private String streamID; + private IProcessingItem sourcePi; + private int batchSize; + + /* + * Constructor + */ + public AbstractStream() { + this(null); + } + + public AbstractStream(IProcessingItem sourcePi) { + this.sourcePi = sourcePi; + this.batchSize = 1; + } + + /** + * Get source processing item of this stream + * + * @return + */ + public IProcessingItem getSourceProcessingItem() { + return this.sourcePi; + } + + /* + * Process event + */ + @Override + /** + * Send a ContentEvent + * @param event + * the ContentEvent to be sent + */ + public abstract void put(ContentEvent event); + + /* + * Stream name + */ + /** + * Get name (ID) of this stream + * + * @return the name (ID) + */ + @Override + public String getStreamId() { + return this.streamID; + } + + /** + * Set the name (ID) of this stream + * + * @param streamID + * the name (ID) + */ + public void setStreamId(String streamID) { + this.streamID = streamID; + } + + /* + * Batch size + */ + /** + * Set suggested batch size + * + * @param batchSize + * the suggested batch size + * + */ + @Override + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + /** + * Get suggested batch size + * + * @return the suggested batch size + */ + public int getBatchSize() { + return this.batchSize; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/AbstractTopology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/AbstractTopology.java b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractTopology.java new file mode 100644 index 0000000..9d90c41 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/AbstractTopology.java @@ -0,0 +1,133 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.util.HashSet; +import java.util.Set; + +/** + * Topology abstract class. + * + * It manages basic information of a topology: name, sets of Streams and ProcessingItems + * + */ +public abstract class AbstractTopology implements Topology { + + private String topoName; + private Set<Stream> streams; + private Set<IProcessingItem> processingItems; + private Set<EntranceProcessingItem> entranceProcessingItems; + + protected AbstractTopology(String name) { + this.topoName = name; + this.streams = new HashSet<>(); + this.processingItems = new HashSet<>(); + this.entranceProcessingItems = new HashSet<>(); + } + + /** + * Gets the name of this topology + * + * @return name of the topology + */ + public String getTopologyName() { + return this.topoName; + } + + /** + * Sets the name of this topology + * + * @param topologyName + * name of the topology + */ + public void setTopologyName(String topologyName) { + this.topoName = topologyName; + } + + /** + * Adds an Entrance processing item to the topology. + * + * @param epi + * Entrance processing item + */ + public void addEntranceProcessingItem(EntranceProcessingItem epi) { + this.entranceProcessingItems.add(epi); + this.addProcessingItem(epi); + } + + /** + * Gets entrance processing items in the topology + * + * @return the set of processing items + */ + public Set<EntranceProcessingItem> getEntranceProcessingItems() { + return this.entranceProcessingItems; + } + + /** + * Add processing item to topology. + * + * @param procItem + * Processing item. + */ + public void addProcessingItem(IProcessingItem procItem) { + addProcessingItem(procItem, 1); + } + + /** + * Add processing item to topology. + * + * @param procItem + * Processing item. + * @param parallelismHint + * Processing item parallelism level. + */ + public void addProcessingItem(IProcessingItem procItem, int parallelismHint) { + this.processingItems.add(procItem); + } + + /** + * Gets processing items in the topology (including entrance processing items) + * + * @return the set of processing items + */ + public Set<IProcessingItem> getProcessingItems() { + return this.processingItems; + } + + /** + * Add stream to topology. + * + * @param stream + */ + public void addStream(Stream stream) { + this.streams.add(stream); + } + + /** + * Gets streams in the topology + * + * @return the set of streams + */ + public Set<Stream> getStreams() { + return this.streams; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/ComponentFactory.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/ComponentFactory.java b/samoa-api/src/main/java/org/apache/samoa/topology/ComponentFactory.java new file mode 100644 index 0000000..d1e0c13 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/ComponentFactory.java @@ -0,0 +1,78 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; + +/** + * ComponentFactory interface. Provides platform specific components. + */ +public interface ComponentFactory { + + /** + * Creates a platform specific processing item with the specified processor. + * + * @param processor + * contains the logic for this processing item. + * @return ProcessingItem + */ + public ProcessingItem createPi(Processor processor); + + /** + * Creates a platform specific processing item with the specified processor. Additionally sets the parallelism level. + * + * @param processor + * contains the logic for this processing item. + * @param parallelism + * defines the amount of instances of this processing item will be created. + * @return ProcessingItem + */ + public ProcessingItem createPi(Processor processor, int parallelism); + + /** + * Creates a platform specific processing item with the specified processor that is the entrance point in the + * topology. This processing item can either generate a stream of data or connect to an external stream of data. + * + * @param entranceProcessor + * contains the logic for this processing item. + * @return EntranceProcessingItem + */ + public EntranceProcessingItem createEntrancePi(EntranceProcessor entranceProcessor); + + /** + * Creates a platform specific stream. + * + * @param sourcePi + * source processing item which will provide the events for this stream. + * @return Stream + */ + public Stream createStream(IProcessingItem sourcePi); + + /** + * Creates a platform specific topology. + * + * @param topoName + * Topology name. + * @return Topology + */ + public Topology createTopology(String topoName); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/EntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/EntranceProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/topology/EntranceProcessingItem.java new file mode 100644 index 0000000..6fc0ed3 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/EntranceProcessingItem.java @@ -0,0 +1,46 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.EntranceProcessor; + +/** + * Entrance processing item interface. + */ +public interface EntranceProcessingItem extends IProcessingItem { + + @Override + /** + * Gets the processing item processor. + * + * @return the embedded EntranceProcessor. + */ + public EntranceProcessor getProcessor(); + + /** + * Set the single output stream for this EntranceProcessingItem. + * + * @param stream + * the stream + * @return the current instance of the EntranceProcessingItem for fluent interface. + */ + public EntranceProcessingItem setOutputStream(Stream stream); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/IProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/IProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/topology/IProcessingItem.java new file mode 100644 index 0000000..af86bde --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/IProcessingItem.java @@ -0,0 +1,47 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.Processor; + +/** + * ProcessingItem interface specific for entrance processing items. + * + * @author severien + * + */ +public interface IProcessingItem { + + /** + * Gets the processing item processor. + * + * @return Processor + */ + public Processor getProcessor(); + + /** + * Sets processing item name. + * + * @param name + */ + // public void setName(String name); + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/ISubmitter.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/ISubmitter.java b/samoa-api/src/main/java/org/apache/samoa/topology/ISubmitter.java new file mode 100644 index 0000000..b576982 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/ISubmitter.java @@ -0,0 +1,46 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.tasks.Task; + +/** + * Submitter interface for programatically deploying platform specific topologies. + * + * @author severien + * + */ +public interface ISubmitter { + + /** + * Deploy a specific task to a platform. + * + * @param task + */ + public void deployTask(Task task); + + /** + * Sets if the task should run locally or distributed. + * + * @param bool + */ + public void setLocal(boolean bool); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/LocalEntranceProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/LocalEntranceProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/topology/LocalEntranceProcessingItem.java new file mode 100644 index 0000000..3cfa736 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/LocalEntranceProcessingItem.java @@ -0,0 +1,81 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; + +/** + * Implementation of EntranceProcessingItem for local engines (Simple, Multithreads) + * + * @author Anh Thu Vu + * + */ +public class LocalEntranceProcessingItem extends AbstractEntranceProcessingItem { + public LocalEntranceProcessingItem(EntranceProcessor processor) { + super(processor); + } + + /** + * If there are available events, first event in the queue will be sent out on the output stream. + * + * @return true if there is (at least) one available event and it was sent out false otherwise + */ + public boolean injectNextEvent() { + if (this.getProcessor().hasNext()) { + ContentEvent event = this.getProcessor().nextEvent(); + this.getOutputStream().put(event); + return true; + } + return false; + } + + /** + * Start sending events by calling {@link #injectNextEvent()}. If there are no available events, and that the stream + * is not entirely consumed, it will wait by calling {@link #waitForNewEvents()} before attempting to send again. </p> + * When the stream is entirely consumed, the last event is tagged accordingly and the processor gets the finished + * status. + * + */ + public void startSendingEvents() { + if (this.getOutputStream() == null) + throw new IllegalStateException("Try sending events from EntrancePI while outputStream is not set."); + + while (!this.getProcessor().isFinished()) { + if (!this.injectNextEvent()) { + try { + waitForNewEvents(); + } catch (Exception e) { + e.printStackTrace(); + break; + } + } + } + } + + /** + * Method to wait for an amount of time when there are no available events. Implementation of EntranceProcessingItem + * should override this method to implement non-blocking wait or to adjust the amount of time. + */ + protected void waitForNewEvents() throws Exception { + Thread.sleep(100); + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/ProcessingItem.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/ProcessingItem.java b/samoa-api/src/main/java/org/apache/samoa/topology/ProcessingItem.java new file mode 100644 index 0000000..edb4aaa --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/ProcessingItem.java @@ -0,0 +1,68 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Processing item interface. + * + * @author severien + * + */ +public interface ProcessingItem extends IProcessingItem { + + /** + * Connects this processing item in a round robin fashion. The events will be distributed evenly between the + * instantiated processing items. + * + * @param inputStream + * Stream to connect this processing item. + * @return ProcessingItem + */ + public ProcessingItem connectInputShuffleStream(Stream inputStream); + + /** + * Connects this processing item taking the event key into account. Events will be routed to the processing item + * according to the modulus of its key and the paralellism level. Ex.: key = 5 and paralellism = 2, 5 mod 2 = 1. + * Processing item responsible for 1 will receive this event. + * + * @param inputStream + * Stream to connect this processing item. + * @return ProcessingItem + */ + public ProcessingItem connectInputKeyStream(Stream inputStream); + + /** + * Connects this processing item to the stream in a broadcast fashion. All processing items of this type will receive + * copy of the original event. + * + * @param inputStream + * Stream to connect this processing item. + * @return ProcessingItem + */ + public ProcessingItem connectInputAllStream(Stream inputStream); + + /** + * Gets processing item parallelism level. + * + * @return int + */ + public int getParallelism(); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/Stream.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/Stream.java b/samoa-api/src/main/java/org/apache/samoa/topology/Stream.java new file mode 100644 index 0000000..7aadb97 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/Stream.java @@ -0,0 +1,61 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.core.ContentEvent; + +/** + * Stream interface. + * + * @author severien + * + */ +public interface Stream { + + /** + * Puts events into a platform specific data stream. + * + * @param event + */ + public void put(ContentEvent event); + + /** + * Sets the stream id which is represented by a name. + * + * @param stream + */ + // public void setStreamId(String stream); + + /** + * Gets stream id. + * + * @return id + */ + public String getStreamId(); + + /** + * Set batch size + * + * @param batchSize + * the suggested size for batching messages on this stream + */ + public void setBatchSize(int batchsize); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/Topology.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/Topology.java b/samoa-api/src/main/java/org/apache/samoa/topology/Topology.java new file mode 100644 index 0000000..41cbdd0 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/Topology.java @@ -0,0 +1,82 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +public interface Topology { + /* + * Name + */ + /** + * Get the topology's name + * + * @return the name of the topology + */ + public String getTopologyName(); + + /** + * Set the topology's name + * + * @param topologyName + * the name of the topology + */ + public void setTopologyName(String topologyName); + + /* + * Entrance Processing Items + */ + /** + * Add an EntranceProcessingItem to this topology + * + * @param epi + * the EntranceProcessingItem to be added + */ + void addEntranceProcessingItem(EntranceProcessingItem epi); + + /* + * Processing Items + */ + /** + * Add a ProcessingItem to this topology with default parallelism level (i.e. 1) + * + * @param procItem + * the ProcessingItem to be added + */ + void addProcessingItem(IProcessingItem procItem); + + /** + * Add a ProcessingItem to this topology with an associated parallelism level + * + * @param procItem + * the ProcessingItem to be added + * @param parallelismHint + * the parallelism level + */ + void addProcessingItem(IProcessingItem procItem, int parallelismHint); + + /* + * Streams + */ + /** + * + * @param stream + */ + void addStream(Stream stream); +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/topology/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/topology/TopologyBuilder.java b/samoa-api/src/main/java/org/apache/samoa/topology/TopologyBuilder.java new file mode 100644 index 0000000..d67b609 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/topology/TopologyBuilder.java @@ -0,0 +1,228 @@ +package org.apache.samoa.topology; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import java.util.HashMap; +import java.util.Map; + +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; + +import com.google.common.base.Preconditions; + +/** + * Builder class that creates topology components and assemble them together. + * + */ +public class TopologyBuilder { + + // TODO: + // Possible options: + // 1. we may convert this as interface and platform dependent builder will + // inherit this method + // 2. refactor by combining TopologyBuilder, ComponentFactory and Topology + // -ve -> fat class where it has capabilities to instantiate specific + // component and connecting them + // +ve -> easy abstraction for SAMOA developer + // "you just implement your builder logic here!" + private ComponentFactory componentFactory; + private Topology topology; + private Map<Processor, IProcessingItem> mapProcessorToProcessingItem; + + // TODO: refactor, temporary constructor used by Storm code + public TopologyBuilder() { + // TODO: initialize _componentFactory using dynamic binding + // for now, use StormComponentFactory + // should the factory be Singleton (?) + // ans: at the moment, no, i.e. each builder will has its associated + // factory! + // and the factory will be instantiated using dynamic binding + // this.componentFactory = new StormComponentFactory(); + } + + // TODO: refactor, temporary constructor used by S4 code + public TopologyBuilder(ComponentFactory theFactory) { + this.componentFactory = theFactory; + } + + /** + * Initiates topology with a specific name. + * + * @param topologyName + */ + public void initTopology(String topologyName) { + this.initTopology(topologyName, 0); + } + + /** + * Initiates topology with a specific name and a delay between consecutive instances. + * + * @param topologyName + * @param delay + * delay between injections of two instances from source (in milliseconds) + */ + public void initTopology(String topologyName, int delay) { + if (this.topology != null) { + // TODO: possible refactor this code later + System.out.println("Topology has been initialized before!"); + return; + } + this.topology = componentFactory.createTopology(topologyName); + } + + /** + * Returns the platform specific topology. + * + * @return + */ + public Topology build() { + return topology; + } + + public ProcessingItem addProcessor(Processor processor, int parallelism) { + ProcessingItem pi = createPi(processor, parallelism); + if (this.mapProcessorToProcessingItem == null) + this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); + this.mapProcessorToProcessingItem.put(processor, pi); + return pi; + } + + public ProcessingItem addProcessor(Processor processor) { + return addProcessor(processor, 1); + } + + public ProcessingItem connectInputShuffleStream(Stream inputStream, Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + return pi.connectInputShuffleStream(inputStream); + } + + public ProcessingItem connectInputKeyStream(Stream inputStream, Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + return pi.connectInputKeyStream(inputStream); + } + + public ProcessingItem connectInputAllStream(Stream inputStream, Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + return pi.connectInputAllStream(inputStream); + } + + public Stream createInputShuffleStream(Processor processor, Processor dest) { + Stream inputStream = this.createStream(dest); + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + pi.connectInputShuffleStream(inputStream); + return inputStream; + } + + public Stream createInputKeyStream(Processor processor, Processor dest) { + Stream inputStream = this.createStream(dest); + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + pi.connectInputKeyStream(inputStream); + return inputStream; + } + + public Stream createInputAllStream(Processor processor, Processor dest) { + Stream inputStream = this.createStream(dest); + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to connect to null PI"); + pi.connectInputAllStream(inputStream); + return inputStream; + } + + public Stream createStream(Processor processor) { + IProcessingItem pi = mapProcessorToProcessingItem.get(processor); + Stream ret = null; + Preconditions.checkNotNull(pi, "Trying to create stream from null PI"); + ret = this.createStream(pi); + if (pi instanceof EntranceProcessingItem) + ((EntranceProcessingItem) pi).setOutputStream(ret); + return ret; + } + + public EntranceProcessingItem addEntranceProcessor(EntranceProcessor entranceProcessor) { + EntranceProcessingItem pi = createEntrancePi(entranceProcessor); + if (this.mapProcessorToProcessingItem == null) + this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); + mapProcessorToProcessingItem.put(entranceProcessor, pi); + return pi; + } + + public ProcessingItem getProcessingItem(Processor processor) { + ProcessingItem pi = (ProcessingItem) mapProcessorToProcessingItem.get(processor); + Preconditions.checkNotNull(pi, "Trying to retrieve null PI"); + return pi; + } + + /** + * Creates a processing item with a specific processor and paralellism level of 1. + * + * @param processor + * @return ProcessingItem + */ + @SuppressWarnings("unused") + private ProcessingItem createPi(Processor processor) { + return createPi(processor, 1); + } + + /** + * Creates a processing item with a specific processor and paralellism level. + * + * @param processor + * @param parallelism + * @return ProcessingItem + */ + private ProcessingItem createPi(Processor processor, int parallelism) { + ProcessingItem pi = this.componentFactory.createPi(processor, parallelism); + this.topology.addProcessingItem(pi, parallelism); + return pi; + } + + /** + * Creates a platform specific entrance processing item. + * + * @param processor + * @return + */ + private EntranceProcessingItem createEntrancePi(EntranceProcessor processor) { + EntranceProcessingItem epi = this.componentFactory.createEntrancePi(processor); + this.topology.addEntranceProcessingItem(epi); + if (this.mapProcessorToProcessingItem == null) + this.mapProcessorToProcessingItem = new HashMap<Processor, IProcessingItem>(); + this.mapProcessorToProcessingItem.put(processor, epi); + return epi; + } + + /** + * Creates a platform specific stream. + * + * @param sourcePi + * source processing item. + * @return + */ + private Stream createStream(IProcessingItem sourcePi) { + Stream stream = this.componentFactory.createStream(sourcePi); + this.topology.addStream(stream); + return stream; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/utils/PartitioningScheme.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/utils/PartitioningScheme.java b/samoa-api/src/main/java/org/apache/samoa/utils/PartitioningScheme.java new file mode 100644 index 0000000..374120d --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/utils/PartitioningScheme.java @@ -0,0 +1,33 @@ +package org.apache.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +/** + * Represents the 3 schemes to partition the streams + * + * @author Anh Thu Vu + * + */ +public enum PartitioningScheme { + SHUFFLE, GROUP_BY_KEY, BROADCAST +} +// TODO: use this enum in S4 +// Storm doesn't seem to need this http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/utils/StreamDestination.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/utils/StreamDestination.java b/samoa-api/src/main/java/org/apache/samoa/utils/StreamDestination.java new file mode 100644 index 0000000..be82982 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/utils/StreamDestination.java @@ -0,0 +1,62 @@ +package org.apache.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import org.apache.samoa.topology.IProcessingItem; + +/** + * Represents one destination for streams. It has the info of: the ProcessingItem, parallelismHint, and partitioning + * scheme. Usage: - When ProcessingItem connects to a stream, it will pass a StreamDestination to the stream. - Stream + * manages a set of StreamDestination. - Used in single-threaded and multi-threaded local mode. + * + * @author Anh Thu Vu + * + */ +public class StreamDestination { + private IProcessingItem pi; + private int parallelism; + private PartitioningScheme type; + + /* + * Constructor + */ + public StreamDestination(IProcessingItem pi, int parallelismHint, PartitioningScheme type) { + this.pi = pi; + this.parallelism = parallelismHint; + this.type = type; + } + + /* + * Getters + */ + public IProcessingItem getProcessingItem() { + return this.pi; + } + + public int getParallelism() { + return this.parallelism; + } + + public PartitioningScheme getPartitioningScheme() { + return this.type; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/main/java/org/apache/samoa/utils/Utils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/utils/Utils.java b/samoa-api/src/main/java/org/apache/samoa/utils/Utils.java new file mode 100644 index 0000000..f8ae5ef --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/utils/Utils.java @@ -0,0 +1,183 @@ +package org.apache.samoa.utils; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.jar.Attributes; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; +import java.util.zip.ZipEntry; + +/** + * Utils class for building and deploying applications programmatically. + * + * @author severien + * + */ +public class Utils { + + public static void buildSamoaPackage() { + try { + String output = "/tmp/samoa/samoa.jar";// System.getProperty("user.home") + "/samoa.jar"; + Manifest manifest = createManifest(); + + BufferedOutputStream bo; + + bo = new BufferedOutputStream(new FileOutputStream(output)); + JarOutputStream jo = new JarOutputStream(bo, manifest); + + String baseDir = System.getProperty("user.dir"); + System.out.println(baseDir); + + File samoaJar = new File(baseDir + "/target/samoa-0.0.1-SNAPSHOT.jar"); + addEntry(jo, samoaJar, baseDir + "/target/", "/app/"); + addLibraries(jo); + + jo.close(); + bo.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + // TODO should get the modules file from the parameters + public static void buildModulesPackage(List<String> modulesNames) { + System.out.println(System.getProperty("user.dir")); + try { + String baseDir = System.getProperty("user.dir"); + List<File> filesArray = new ArrayList<>(); + for (String module : modulesNames) { + module = "/" + module.replace(".", "/") + ".class"; + filesArray.add(new File(baseDir + module)); + } + String output = System.getProperty("user.home") + "/modules.jar"; + + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, + "1.0"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, + "http://samoa.yahoo.com"); + manifest.getMainAttributes().put( + Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, + "Yahoo"); + manifest.getMainAttributes().put( + Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); + + BufferedOutputStream bo; + + bo = new BufferedOutputStream(new FileOutputStream(output)); + JarOutputStream jo = new JarOutputStream(bo, manifest); + + File[] files = filesArray.toArray(new File[filesArray.size()]); + addEntries(jo, files, baseDir, ""); + + jo.close(); + bo.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + private static void addLibraries(JarOutputStream jo) { + try { + String baseDir = System.getProperty("user.dir"); + String libDir = baseDir + "/target/lib"; + File inputFile = new File(libDir); + + File[] files = inputFile.listFiles(); + for (File file : files) { + addEntry(jo, file, baseDir, "lib"); + } + jo.close(); + + } catch (IOException e) { + e.printStackTrace(); + } + } + + private static void addEntries(JarOutputStream jo, File[] files, String baseDir, String rootDir) { + for (File file : files) { + + if (!file.isDirectory()) { + addEntry(jo, file, baseDir, rootDir); + } else { + File dir = new File(file.getAbsolutePath()); + addEntries(jo, dir.listFiles(), baseDir, rootDir); + } + } + } + + private static void addEntry(JarOutputStream jo, File file, String baseDir, String rootDir) { + try { + BufferedInputStream bi = new BufferedInputStream(new FileInputStream(file)); + + String path = file.getAbsolutePath().replaceFirst(baseDir, rootDir); + jo.putNextEntry(new ZipEntry(path)); + + byte[] buf = new byte[1024]; + int anz; + while ((anz = bi.read(buf)) != -1) { + jo.write(buf, 0, anz); + } + bi.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static Manifest createManifest() { + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_URL, "http://samoa.yahoo.com"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VERSION, "0.1"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR, "Yahoo"); + manifest.getMainAttributes().put(Attributes.Name.IMPLEMENTATION_VENDOR_ID, "SAMOA"); + Attributes s4Attributes = new Attributes(); + s4Attributes.putValue("S4-App-Class", "path.to.Class"); + Attributes.Name name = new Attributes.Name("S4-App-Class"); + Attributes.Name S4Version = new Attributes.Name("S4-Version"); + manifest.getMainAttributes().put(name, "samoa.topology.impl.DoTaskApp"); + manifest.getMainAttributes().put(S4Version, "0.6.0-incubating"); + return manifest; + } + + public static Object getInstance(String className) { + Class<?> cls; + Object obj = null; + try { + cls = Class.forName(className); + obj = cls.newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + e.printStackTrace(); + } + return obj; + } +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java b/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java deleted file mode 100644 index 68eec91..0000000 --- a/samoa-api/src/test/java/com/yahoo/labs/samoa/core/DoubleVectorTest.java +++ /dev/null @@ -1,97 +0,0 @@ -package com.yahoo.labs.samoa.core; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import static org.hamcrest.CoreMatchers.*; -import static org.junit.Assert.*; - -import org.junit.Before; -import org.junit.Test; - -public class DoubleVectorTest { - private DoubleVector emptyVector, array5Vector; - - @Before - public void setUp() { - emptyVector = new DoubleVector(); - array5Vector = new DoubleVector(new double[] { 1.1, 2.5, 0, 4.7, 0 }); - } - - @Test - public void testGetArrayRef() { - assertThat(emptyVector.getArrayRef(), notNullValue()); - assertTrue(emptyVector.getArrayRef() == emptyVector.getArrayRef()); - assertEquals(5, array5Vector.getArrayRef().length); - } - - @Test - public void testGetArrayCopy() { - double[] arrayRef; - arrayRef = emptyVector.getArrayRef(); - assertTrue(arrayRef != emptyVector.getArrayCopy()); - assertThat(arrayRef, is(equalTo(emptyVector.getArrayCopy()))); - - arrayRef = array5Vector.getArrayRef(); - assertTrue(arrayRef != array5Vector.getArrayCopy()); - assertThat(arrayRef, is(equalTo(array5Vector.getArrayCopy()))); - } - - @Test - public void testNumNonZeroEntries() { - assertEquals(0, emptyVector.numNonZeroEntries()); - assertEquals(3, array5Vector.numNonZeroEntries()); - } - - @Test(expected = IndexOutOfBoundsException.class) - public void testGetValueOutOfBound() { - @SuppressWarnings("unused") - double value = emptyVector.getArrayRef()[0]; - } - - @Test() - public void testSetValue() { - // test automatic vector enlargement - emptyVector.setValue(0, 1.0); - assertEquals(1, emptyVector.getArrayRef().length); - assertEquals(1.0, emptyVector.getArrayRef()[0], 0.0); // should be exactly the same, so delta=0.0 - - emptyVector.setValue(5, 5.5); - assertEquals(6, emptyVector.getArrayRef().length); - assertEquals(2, emptyVector.numNonZeroEntries()); - assertEquals(5.5, emptyVector.getArrayRef()[5], 0.0); // should be exactly the same, so delta=0.0 - } - - @Test - public void testAddToValue() { - array5Vector.addToValue(2, 5.0); - assertEquals(5, array5Vector.getArrayRef()[2], 0.0); // should be exactly the same, so delta=0.0 - - // test automatic vector enlargement - emptyVector.addToValue(0, 1.0); - assertEquals(1, emptyVector.getArrayRef()[0], 0.0); // should be exactly the same, so delta=0.0 - } - - @Test - public void testSumOfValues() { - assertEquals(1.1 + 2.5 + 4.7, array5Vector.sumOfValues(), Double.MIN_NORMAL); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java deleted file mode 100644 index d64ca0e..0000000 --- a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/HDFSFileStreamSourceTest.java +++ /dev/null @@ -1,306 +0,0 @@ -package com.yahoo.labs.samoa.streams.fs; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.BufferedWriter; -import java.io.BufferedReader; -import java.io.File; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.util.Iterator; -import java.util.Set; -import java.util.HashSet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.MiniDFSCluster.Builder; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; - -import static org.junit.Assert.*; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class HDFSFileStreamSourceTest { - - private static final String[] HOSTS = { "localhost" }; - private static final String BASE_DIR = "/minidfsTest"; - private static final int NUM_FILES_IN_DIR = 4; - private static final int NUM_NOISE_FILES_IN_DIR = 2; - - private HDFSFileStreamSource streamSource; - - private Configuration config; - private MiniDFSCluster hdfsCluster; - private String hdfsURI; - - @Before - public void setUp() throws Exception { - // Start MiniDFSCluster - MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(new Configuration()).hosts(HOSTS).numDataNodes(1) - .format(true); - hdfsCluster = builder.build(); - hdfsCluster.waitActive(); - hdfsURI = "hdfs://localhost:" + hdfsCluster.getNameNodePort(); - - // Construct stream source - streamSource = new HDFSFileStreamSource(); - - // General config - config = new Configuration(); - config.set("fs.defaultFS", hdfsURI); - } - - @After - public void tearDown() throws Exception { - hdfsCluster.shutdown(); - } - - /* - * Init tests - */ - @Test - public void testInitWithSingleFileAndExtension() { - // write input file - writeSimpleFiles(BASE_DIR, "txt", 1); - - // init with path to input file - streamSource.init(config, BASE_DIR + "/1.txt", "txt"); - - // assertions - assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); - String fn = streamSource.getFilePathAt(0); - assertTrue("Incorrect file in filePaths.", - fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + "1.txt")); - } - - @Test - public void testInitWithSingleFileAndNullExtension() { - // write input file - writeSimpleFiles(BASE_DIR, "txt", 1); - - // init with path to input file - streamSource.init(config, BASE_DIR + "/1.txt", null); - - // assertions - assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); - String fn = streamSource.getFilePathAt(0); - assertTrue("Incorrect file in filePaths.", - fn.equals(BASE_DIR + "/1.txt") || fn.equals(hdfsURI + BASE_DIR + "1.txt")); - } - - @Test - public void testInitWithFolderAndExtension() { - // write input files & noise files - writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); - writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, "txt"); - - // assertions - assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); - Set<String> filenames = new HashSet<String>(); - for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { - String targetFn = BASE_DIR + "/" + Integer.toString(i) + ".txt"; - filenames.add(targetFn); - filenames.add(hdfsURI + targetFn); - } - for (int i = 0; i < NUM_FILES_IN_DIR; i++) { - String fn = streamSource.getFilePathAt(i); - assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); - } - } - - @Test - public void testInitWithFolderAndNullExtension() { - // write input file - writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, null); - - // assertions - assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); - Set<String> filenames = new HashSet<String>(); - for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { - String targetFn = BASE_DIR + "/" + Integer.toString(i); - filenames.add(targetFn); - filenames.add(hdfsURI + targetFn); - } - for (int i = 0; i < NUM_FILES_IN_DIR; i++) { - String fn = streamSource.getFilePathAt(i); - assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); - } - } - - /* - * getNextInputStream tests - */ - @Test - public void testGetNextInputStream() { - // write input files & noise files - writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, "txt"); - - // call getNextInputStream & assertions - Set<String> contents = new HashSet<String>(); - for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { - contents.add(Integer.toString(i)); - } - for (int i = 0; i < NUM_FILES_IN_DIR; i++) { - InputStream inStream = streamSource.getNextInputStream(); - assertNotNull("Unexpected end of input stream list.", inStream); - - BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); - String inputRead = null; - try { - inputRead = rd.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:" + i + ioe.getMessage()); - } - assertTrue("File content is incorrect.", contents.contains(inputRead)); - Iterator<String> it = contents.iterator(); - while (it.hasNext()) { - if (it.next().equals(inputRead)) { - it.remove(); - break; - } - } - } - - // assert that another call to getNextInputStream will return null - assertNull("Call getNextInputStream after the last file did not return null.", streamSource.getNextInputStream()); - } - - /* - * getCurrentInputStream tests - */ - public void testGetCurrentInputStream() { - // write input files & noise files - writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, "txt"); - - // call getNextInputStream, getCurrentInputStream & assertions - for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list - InputStream inStream1 = streamSource.getNextInputStream(); - InputStream inStream2 = streamSource.getCurrentInputStream(); - assertSame("Incorrect current input stream.", inStream1, inStream2); - } - } - - /* - * reset tests - */ - public void testReset() { - // write input files & noise files - writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(config, BASE_DIR, "txt"); - - // Get the first input string - InputStream firstInStream = streamSource.getNextInputStream(); - String firstInput = null; - assertNotNull("Unexpected end of input stream list.", firstInStream); - - BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); - try { - firstInput = rd1.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:0" + ioe.getMessage()); - } - - // call getNextInputStream a few times - streamSource.getNextInputStream(); - - // call reset, call next, assert that output is 1 (the first file) - try { - streamSource.reset(); - } catch (IOException ioe) { - fail("Fail resetting stream source." + ioe.getMessage()); - } - - InputStream inStream = streamSource.getNextInputStream(); - assertNotNull("Unexpected end of input stream list.", inStream); - - BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); - String inputRead = null; - try { - inputRead = rd2.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:0" + ioe.getMessage()); - } - assertEquals("File content is incorrect.", firstInput, inputRead); - } - - private void writeSimpleFiles(String path, String ext, int numOfFiles) { - // get filesystem - FileSystem dfs; - try { - dfs = hdfsCluster.getFileSystem(); - } catch (IOException ioe) { - fail("Could not access MiniDFSCluster" + ioe.getMessage()); - return; - } - - // create basedir - Path basedir = new Path(path); - try { - dfs.mkdirs(basedir); - } catch (IOException ioe) { - fail("Could not create DIR:" + path + "\n" + ioe.getMessage()); - return; - } - - // write files - for (int i = 1; i <= numOfFiles; i++) { - String fn = null; - if (ext != null) { - fn = Integer.toString(i) + "." + ext; - } else { - fn = Integer.toString(i); - } - - try { - OutputStream fin = dfs.create(new Path(path, fn)); - BufferedWriter wr = new BufferedWriter(new OutputStreamWriter(fin)); - wr.write(Integer.toString(i)); - wr.close(); - fin.close(); - } catch (IOException ioe) { - fail("Fail writing to input file: " + fn + " in directory: " + path + ioe.getMessage()); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java b/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java deleted file mode 100644 index 5898421..0000000 --- a/samoa-api/src/test/java/com/yahoo/labs/samoa/streams/fs/LocalFileStreamSourceTest.java +++ /dev/null @@ -1,276 +0,0 @@ -package com.yahoo.labs.samoa.streams.fs; - -/* - * #%L - * SAMOA - * %% - * Copyright (C) 2014 - 2015 Apache Software Foundation - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.io.BufferedWriter; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileWriter; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.lang.SecurityException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; - -import static org.junit.Assert.*; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.apache.commons.io.FileUtils; - -public class LocalFileStreamSourceTest { - private static final String BASE_DIR = "localfsTest"; - private static final int NUM_FILES_IN_DIR = 4; - private static final int NUM_NOISE_FILES_IN_DIR = 2; - - private LocalFileStreamSource streamSource; - - @Before - public void setUp() throws Exception { - streamSource = new LocalFileStreamSource(); - - } - - @After - public void tearDown() throws Exception { - FileUtils.deleteDirectory(new File(BASE_DIR)); - } - - @Test - public void testInitWithSingleFileAndExtension() { - // write input file - writeSimpleFiles(BASE_DIR, "txt", 1); - - // init with path to input file - File inFile = new File(BASE_DIR, "1.txt"); - String inFilePath = inFile.getAbsolutePath(); - streamSource.init(inFilePath, "txt"); - - // assertions - assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); - String fn = streamSource.getFilePathAt(0); - assertEquals("Incorrect file in filePaths.", inFilePath, fn); - } - - @Test - public void testInitWithSingleFileAndNullExtension() { - // write input file - writeSimpleFiles(BASE_DIR, "txt", 1); - - // init with path to input file - File inFile = new File(BASE_DIR, "1.txt"); - String inFilePath = inFile.getAbsolutePath(); - streamSource.init(inFilePath, null); - - // assertions - assertEquals("Size of filePaths is not correct.", 1, streamSource.getFilePathListSize(), 0); - String fn = streamSource.getFilePathAt(0); - assertEquals("Incorrect file in filePaths.", inFilePath, fn); - } - - @Test - public void testInitWithFolderAndExtension() { - // write input file - writeSimpleFiles(BASE_DIR, null, NUM_NOISE_FILES_IN_DIR); - writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); - - // init with path to input dir - File inDir = new File(BASE_DIR); - String inDirPath = inDir.getAbsolutePath(); - streamSource.init(inDirPath, "txt"); - - // assertions - assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); - Set<String> filenames = new HashSet<String>(); - for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { - String expectedFn = (new File(inDirPath, Integer.toString(i) + ".txt")).getAbsolutePath(); - filenames.add(expectedFn); - } - for (int i = 0; i < NUM_FILES_IN_DIR; i++) { - String fn = streamSource.getFilePathAt(i); - assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); - } - } - - @Test - public void testInitWithFolderAndNullExtension() { - // write input file - writeSimpleFiles(BASE_DIR, null, NUM_FILES_IN_DIR); - - // init with path to input dir - File inDir = new File(BASE_DIR); - String inDirPath = inDir.getAbsolutePath(); - streamSource.init(inDirPath, null); - - // assertions - assertEquals("Size of filePaths is not correct.", NUM_FILES_IN_DIR, streamSource.getFilePathListSize(), 0); - Set<String> filenames = new HashSet<String>(); - for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { - String expectedFn = (new File(inDirPath, Integer.toString(i))).getAbsolutePath(); - filenames.add(expectedFn); - } - for (int i = 0; i < NUM_FILES_IN_DIR; i++) { - String fn = streamSource.getFilePathAt(i); - assertTrue("Incorrect file in filePaths:" + fn, filenames.contains(fn)); - } - } - - /* - * getNextInputStream tests - */ - @Test - public void testGetNextInputStream() { - // write input files & noise files - writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(BASE_DIR, "txt"); - - // call getNextInputStream & assertions - Set<String> contents = new HashSet<String>(); - for (int i = 1; i <= NUM_FILES_IN_DIR; i++) { - contents.add(Integer.toString(i)); - } - for (int i = 0; i < NUM_FILES_IN_DIR; i++) { - InputStream inStream = streamSource.getNextInputStream(); - assertNotNull("Unexpected end of input stream list.", inStream); - - BufferedReader rd = new BufferedReader(new InputStreamReader(inStream)); - String inputRead = null; - try { - inputRead = rd.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:" + i + ioe.getMessage()); - } - assertTrue("File content is incorrect.", contents.contains(inputRead)); - Iterator<String> it = contents.iterator(); - while (it.hasNext()) { - if (it.next().equals(inputRead)) { - it.remove(); - break; - } - } - } - - // assert that another call to getNextInputStream will return null - assertNull("Call getNextInputStream after the last file did not return null.", streamSource.getNextInputStream()); - } - - /* - * getCurrentInputStream tests - */ - public void testGetCurrentInputStream() { - // write input files & noise files - writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(BASE_DIR, "txt"); - - // call getNextInputStream, getCurrentInputStream & assertions - for (int i = 0; i <= NUM_FILES_IN_DIR; i++) { // test also after-end-of-list - InputStream inStream1 = streamSource.getNextInputStream(); - InputStream inStream2 = streamSource.getCurrentInputStream(); - assertSame("Incorrect current input stream.", inStream1, inStream2); - } - } - - /* - * reset tests - */ - public void testReset() { - // write input files & noise files - writeSimpleFiles(BASE_DIR, "txt", NUM_FILES_IN_DIR); - - // init with path to input dir - streamSource.init(BASE_DIR, "txt"); - - // Get the first input string - InputStream firstInStream = streamSource.getNextInputStream(); - String firstInput = null; - assertNotNull("Unexpected end of input stream list.", firstInStream); - - BufferedReader rd1 = new BufferedReader(new InputStreamReader(firstInStream)); - try { - firstInput = rd1.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:0" + ioe.getMessage()); - } - - // call getNextInputStream a few times - streamSource.getNextInputStream(); - - // call reset, call next, assert that output is 1 (the first file) - try { - streamSource.reset(); - } catch (IOException ioe) { - fail("Fail resetting stream source." + ioe.getMessage()); - } - - InputStream inStream = streamSource.getNextInputStream(); - assertNotNull("Unexpected end of input stream list.", inStream); - - BufferedReader rd2 = new BufferedReader(new InputStreamReader(inStream)); - String inputRead = null; - try { - inputRead = rd2.readLine(); - } catch (IOException ioe) { - fail("Fail reading from stream at index:0" + ioe.getMessage()); - } - assertEquals("File content is incorrect.", firstInput, inputRead); - } - - private void writeSimpleFiles(String path, String ext, int numOfFiles) { - // Create folder - File folder = new File(path); - if (!folder.exists()) { - try { - folder.mkdir(); - } catch (SecurityException se) { - fail("Failed creating directory:" + path + se); - } - } - - // Write files - for (int i = 1; i <= numOfFiles; i++) { - String fn = null; - if (ext != null) { - fn = Integer.toString(i) + "." + ext; - } else { - fn = Integer.toString(i); - } - - try { - FileWriter fwr = new FileWriter(new File(path, fn)); - fwr.write(Integer.toString(i)); - fwr.close(); - } catch (IOException ioe) { - fail("Fail writing to input file: " + fn + " in directory: " + path + ioe.getMessage()); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/9b178f63/samoa-api/src/test/java/org/apache/samoa/core/DoubleVectorTest.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/test/java/org/apache/samoa/core/DoubleVectorTest.java b/samoa-api/src/test/java/org/apache/samoa/core/DoubleVectorTest.java new file mode 100644 index 0000000..71b8f1b --- /dev/null +++ b/samoa-api/src/test/java/org/apache/samoa/core/DoubleVectorTest.java @@ -0,0 +1,98 @@ +package org.apache.samoa.core; + +/* + * #%L + * SAMOA + * %% + * Copyright (C) 2014 - 2015 Apache Software Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; + +import org.apache.samoa.core.DoubleVector; +import org.junit.Before; +import org.junit.Test; + +public class DoubleVectorTest { + private DoubleVector emptyVector, array5Vector; + + @Before + public void setUp() { + emptyVector = new DoubleVector(); + array5Vector = new DoubleVector(new double[] { 1.1, 2.5, 0, 4.7, 0 }); + } + + @Test + public void testGetArrayRef() { + assertThat(emptyVector.getArrayRef(), notNullValue()); + assertTrue(emptyVector.getArrayRef() == emptyVector.getArrayRef()); + assertEquals(5, array5Vector.getArrayRef().length); + } + + @Test + public void testGetArrayCopy() { + double[] arrayRef; + arrayRef = emptyVector.getArrayRef(); + assertTrue(arrayRef != emptyVector.getArrayCopy()); + assertThat(arrayRef, is(equalTo(emptyVector.getArrayCopy()))); + + arrayRef = array5Vector.getArrayRef(); + assertTrue(arrayRef != array5Vector.getArrayCopy()); + assertThat(arrayRef, is(equalTo(array5Vector.getArrayCopy()))); + } + + @Test + public void testNumNonZeroEntries() { + assertEquals(0, emptyVector.numNonZeroEntries()); + assertEquals(3, array5Vector.numNonZeroEntries()); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testGetValueOutOfBound() { + @SuppressWarnings("unused") + double value = emptyVector.getArrayRef()[0]; + } + + @Test() + public void testSetValue() { + // test automatic vector enlargement + emptyVector.setValue(0, 1.0); + assertEquals(1, emptyVector.getArrayRef().length); + assertEquals(1.0, emptyVector.getArrayRef()[0], 0.0); // should be exactly the same, so delta=0.0 + + emptyVector.setValue(5, 5.5); + assertEquals(6, emptyVector.getArrayRef().length); + assertEquals(2, emptyVector.numNonZeroEntries()); + assertEquals(5.5, emptyVector.getArrayRef()[5], 0.0); // should be exactly the same, so delta=0.0 + } + + @Test + public void testAddToValue() { + array5Vector.addToValue(2, 5.0); + assertEquals(5, array5Vector.getArrayRef()[2], 0.0); // should be exactly the same, so delta=0.0 + + // test automatic vector enlargement + emptyVector.addToValue(0, 1.0); + assertEquals(1, emptyVector.getArrayRef()[0], 0.0); // should be exactly the same, so delta=0.0 + } + + @Test + public void testSumOfValues() { + assertEquals(1.1 + 2.5 + 4.7, array5Vector.sumOfValues(), Double.MIN_NORMAL); + } + +}
