http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java index 791ce3e..eae8b15 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/DagMeta.java @@ -27,8 +27,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.stream.api.Option; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.Attribute; import com.datatorrent.api.DAG; @@ -36,14 +41,15 @@ import com.datatorrent.api.Operator; import com.datatorrent.stram.plan.logical.LogicalPlan; /** - * Graph data structure for DAG - * With this data structure, the framework can do lazy load and optimization + * Logical graph data structure for DAG <br> + * + * With the build method({@link #buildDAG()}, {@link #buildDAG(DAG)}) to convert it to Apex DAG * * @since 3.4.0 */ [email protected] public class DagMeta { - private List<NodeMeta> heads = new LinkedList<>(); List<Pair<Attribute, Object>> dagAttributes = new LinkedList<>(); @@ -51,10 +57,10 @@ public class DagMeta public static class NodeMeta { - private String nodeName; - private Operator operator; + private Option[] options; + List<Pair<Attribute, Object>> operatorAttributes = new LinkedList<>(); private Map<Operator.OutputPort, Pair<List<Operator.InputPort>, DAG.Locality>> nodeStreams = new HashMap<>(); @@ -79,11 +85,6 @@ public class DagMeta return children; } - public String getNodeName() - { - return nodeName; - } - public Operator getOperator() { return operator; @@ -94,13 +95,13 @@ public class DagMeta return nodeStreams; } - public NodeMeta(Operator operator, String nodeName) + public NodeMeta(Operator operator, Option... options) { - this.nodeName = nodeName; - this.operator = operator; + this.options = options; + for (Field field : this.operator.getClass().getFields()) { int modifier = field.getModifiers(); if (Modifier.isPublic(modifier) && Modifier.isTransient(modifier) && @@ -122,6 +123,15 @@ public class DagMeta } } + public String getOperatorName() + { + for (Option opt : options) { + if (opt instanceof Option.OpName) { + return ((Option.OpName)opt).getName(); + } + } + return operator.toString(); + } } public DagMeta() @@ -141,11 +151,15 @@ public class DagMeta for (NodeMeta nm : heads) { visitNode(nm, dag); } + logger.debug("Finish building the dag:\n {}", dag.toString()); } private void visitNode(NodeMeta nm, DAG dag) { - dag.addOperator(nm.nodeName, nm.operator); + String opName = nm.getOperatorName(); + logger.debug("Building DAG: add operator {}: {}", opName, nm.operator); + dag.addOperator(opName, nm.operator); + for (NodeMeta child : nm.children) { visitNode(child, dag); } @@ -154,15 +168,18 @@ public class DagMeta if (entry.getKey() == null || entry.getValue().getKey() == null || 0 == entry.getValue().getKey().size()) { continue; } + logger.debug("Building DAG: add stream {} from {} to {}", entry.getKey().toString(), entry.getKey(), entry.getValue().getLeft().toArray(new Operator.InputPort[]{})); DAG.StreamMeta streamMeta = dag.addStream(entry.getKey().toString(), entry.getKey(), entry.getValue().getLeft().toArray(new Operator.InputPort[]{})); // set locality if (entry.getValue().getRight() != null) { + logger.debug("Building DAG: set locality of the stream {} to {}", entry.getKey().toString(), entry.getValue().getRight()); streamMeta.setLocality(entry.getValue().getRight()); } //set attributes for output port if (nm.outputPortAttributes.containsKey(entry.getKey())) { for (Pair<Attribute, Object> attr : nm.outputPortAttributes.get(entry.getKey())) { + logger.debug("Building DAG: set port attribute {} to {} for port {}", attr.getLeft(), attr.getValue(), entry.getKey()); dag.setOutputPortAttribute(entry.getKey(), attr.getLeft(), attr.getValue()); } } @@ -173,6 +190,7 @@ public class DagMeta //set input port attributes if (nm.inputPortAttributes.containsKey(input)) { for (Pair<Attribute, Object> attr : nm.inputPortAttributes.get(input)) { + logger.debug("Building DAG: set port attribute {} to {} for port {}", attr.getLeft(), attr.getValue(), input); dag.setInputPortAttribute(input, attr.getLeft(), attr.getValue()); } } @@ -180,15 +198,16 @@ public class DagMeta // set operator attributes for (Pair<Attribute, Object> attr : nm.operatorAttributes) { + logger.debug("Building DAG: set operator attribute {} to {} for operator {}", attr.getLeft(), attr.getValue(), nm.operator); dag.setAttribute(nm.operator, attr.getLeft(), attr.getValue()); } } - public NodeMeta addNode(String nodeName, Operator operator, NodeMeta parent, Operator.OutputPort parentOutput, Operator.InputPort inputPort) + public NodeMeta addNode(Operator operator, NodeMeta parent, Operator.OutputPort parentOutput, Operator.InputPort inputPort, Option... options) { - NodeMeta newNode = new NodeMeta(operator, nodeName); + NodeMeta newNode = new NodeMeta(operator, options); if (parent == null) { heads.add(newNode); } else { @@ -199,4 +218,6 @@ public class DagMeta return newNode; } + private static final Logger logger = LoggerFactory.getLogger(DagMeta.class); + }
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java index 982980c..b5bc286 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/IDGenerator.java @@ -20,6 +20,8 @@ package org.apache.apex.malhar.stream.api.impl; import java.util.UUID; +import org.apache.hadoop.classification.InterfaceStability; + import com.datatorrent.api.Operator; import static java.lang.System.currentTimeMillis; @@ -29,6 +31,7 @@ import static java.lang.System.currentTimeMillis; * * @since 3.4.0 */ [email protected] public class IDGenerator { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java index 7af6ece..d6201ad 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/StreamFactory.java @@ -18,39 +18,59 @@ */ package org.apache.apex.malhar.stream.api.impl; +import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator; +import org.apache.apex.malhar.kafka.PartitionStrategy; import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.Option; +import org.apache.hadoop.classification.InterfaceStability; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator; import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator; +import static org.apache.apex.malhar.stream.api.Option.Options.name; + /** - * A Factory class to build from different kind of input source + * A Factory class to build stream from different input sources * * @since 3.4.0 */ [email protected] public class StreamFactory { - public static ApexStream<String> fromFolder(String inputOperatorName, String folderName) + /** + * Create a stream of string tuples from reading files in hdfs folder line by line + * @param folderName + * @param opts + * @return + */ + public static ApexStream<String> fromFolder(String folderName, Option... opts) { LineByLineFileInputOperator fileLineInputOperator = new LineByLineFileInputOperator(); fileLineInputOperator.setDirectory(folderName); ApexStreamImpl<String> newStream = new ApexStreamImpl<>(); - return newStream.addOperator(inputOperatorName, fileLineInputOperator, null, fileLineInputOperator.output); + return newStream.addOperator(fileLineInputOperator, null, fileLineInputOperator.output, opts); } public static ApexStream<String> fromFolder(String folderName) { - return fromFolder("FolderScanner", folderName); + return fromFolder(folderName, name("FolderScanner")); } public static ApexStream<String> fromKafka08(String zookeepers, String topic) { - return fromKafka08("Kafka08Input", zookeepers, topic); + return fromKafka08(zookeepers, topic, name("Kafka08Input")); } - public static ApexStream<String> fromKafka08(String inputName, String zookeepers, String topic) + /** + * Create a stream of string reading input from kafka 0.8 + * @param zookeepers + * @param topic + * @param opts + * @return + */ + public static ApexStream<String> fromKafka08(String zookeepers, String topic, Option... opts) { KafkaSinglePortStringInputOperator kafkaSinglePortStringInputOperator = new KafkaSinglePortStringInputOperator(); kafkaSinglePortStringInputOperator.getConsumer().setTopic(topic); @@ -59,25 +79,49 @@ public class StreamFactory return newStream.addOperator(kafkaSinglePortStringInputOperator, null, kafkaSinglePortStringInputOperator.outputPort); } - public static <T> ApexStream<T> fromInput(String inputOperatorName, InputOperator operator, Operator.OutputPort<T> outputPort) + /** + * Create a stream with any input operator + * @param operator + * @param outputPort + * @param opts + * @param <T> + * @return + */ + public static <T> ApexStream<T> fromInput(InputOperator operator, Operator.OutputPort<T> outputPort, Option... opts) { ApexStreamImpl<T> newStream = new ApexStreamImpl<>(); - return newStream.addOperator(inputOperatorName, operator, null, outputPort); + return newStream.addOperator(operator, null, outputPort, opts); } - public static <T> ApexStream<T> fromInput(InputOperator operator, Operator.OutputPort<T> outputPort) + /** + * Create stream of byte array messages from kafka 0.9 + * @param brokers + * @param topic + * @param opts + * @return + */ + public static ApexStream<byte[]> fromKafka09(String brokers, String topic, Option... opts) { - return fromInput(operator.toString(), operator, outputPort); + KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator(); + kafkaInput.setClusters(brokers); + kafkaInput.setTopics(topic); + ApexStreamImpl<String> newStream = new ApexStreamImpl<>(); + return newStream.addOperator(kafkaInput, null, kafkaInput.outputPort, opts); } - public static ApexStream<String> fromKafka09(String name, String brokers, String topic) + /** + * Create stream of byte array messages from kafka 0.9 with more partition options + */ + public static ApexStream<byte[]> fromKafka09(String brokers, String topic, PartitionStrategy strategy, int partitionNumber, Option... opts) { - throw new UnsupportedOperationException(); + KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator(); + kafkaInput.setClusters(brokers); + kafkaInput.setTopics(topic); + kafkaInput.setStrategy(strategy.name()); + kafkaInput.setInitialPartitionCount(partitionNumber); + ApexStreamImpl<String> newStream = new ApexStreamImpl<>(); + return newStream.addOperator(kafkaInput, null, kafkaInput.outputPort, opts); } - public static ApexStream<String> fromKafka09(String brokers, String topic) - { - return fromKafka09("KafkaInput", brokers, topic); - } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java new file mode 100644 index 0000000..1bc500d --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/TupleWrapperOperator.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api.impl; + +import javax.validation.constraints.NotNull; + +import org.apache.apex.malhar.lib.window.Tuple; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Operator; +import com.datatorrent.api.Sink; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; + +/** + * A wrapper operator that intercept the tuples and convert them between {@link Tuple} + */ +public class TupleWrapperOperator implements InputOperator, Operator.CheckpointNotificationListener +{ + + public static class OutputPortWrapper extends DefaultOutputPort implements Sink + { + + @Override + public void put(Object o) + { + emit(o); + } + + @Override + public int getCount(boolean b) + { + // No Accumulation + return 0; + } + } + + public static class InputPortWrapper extends DefaultInputPort<Tuple> + { + + @NotNull + private DefaultInputPort input; + + public void setInput(DefaultInputPort input) + { + this.input = input; + } + + @Override + public void process(Tuple o) + { + input.process(o.getValue()); + } + + @Override + public Sink getSink() + { + return input.getSink(); + } + + @Override + public void setConnected(boolean connected) + { + input.setConnected(connected); + } + + @Override + public void setup(Context.PortContext context) + { + input.setup(context); + } + + @Override + public void teardown() + { + input.teardown(); + } + } + + @InputPortFieldAnnotation(optional = true) + public final transient OutputPortWrapper output1 = new OutputPortWrapper(); + + @InputPortFieldAnnotation(optional = true) + public final transient OutputPortWrapper output2 = new OutputPortWrapper(); + + @InputPortFieldAnnotation(optional = true) + public final transient OutputPortWrapper output3 = new OutputPortWrapper(); + + @InputPortFieldAnnotation(optional = true) + public final transient OutputPortWrapper output4 = new OutputPortWrapper(); + + @InputPortFieldAnnotation(optional = true) + public final transient OutputPortWrapper output5 = new OutputPortWrapper(); + + @InputPortFieldAnnotation(optional = true) + public final transient InputPortWrapper input1 = new InputPortWrapper(); + + @InputPortFieldAnnotation(optional = true) + public final transient InputPortWrapper input2 = new InputPortWrapper(); + + @InputPortFieldAnnotation(optional = true) + public final transient InputPortWrapper input3 = new InputPortWrapper(); + + @InputPortFieldAnnotation(optional = true) + public final transient InputPortWrapper input4 = new InputPortWrapper(); + + @InputPortFieldAnnotation(optional = true) + public final transient InputPortWrapper input5 = new InputPortWrapper(); + + //delegate to + @NotNull + private Operator operator; + + public void setOperator(Operator operator) + { + this.operator = operator; + } + + @Override + public void beginWindow(long l) + { + operator.beginWindow(l); + } + + @Override + public void endWindow() + { + operator.endWindow(); + } + + @Override + public void setup(Context.OperatorContext context) + { + operator.endWindow(); + } + + @Override + public void teardown() + { + operator.teardown(); + } + + @Override + public void beforeCheckpoint(long l) + { + if (operator instanceof CheckpointNotificationListener) { + ((CheckpointNotificationListener)operator).beforeCheckpoint(l); + } + } + + @Override + public void checkpointed(long l) + { + if (operator instanceof CheckpointNotificationListener) { + ((CheckpointNotificationListener)operator).checkpointed(l); + } + } + + @Override + public void committed(long l) + { + if (operator instanceof CheckpointNotificationListener) { + ((CheckpointNotificationListener)operator).committed(l); + } + } + + @Override + public void emitTuples() + { + if (operator instanceof InputOperator) { + ((InputOperator)operator).emitTuples(); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java new file mode 100644 index 0000000..68f1b9e --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/Count.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang3.mutable.MutableLong; + +/** + * Count Accumulation + */ +public class Count implements Accumulation<Long, MutableLong, Long> +{ + + @Override + public MutableLong defaultAccumulatedValue() + { + return new MutableLong(0); + } + + @Override + public MutableLong accumulate(MutableLong accumulatedValue, Long input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Long getOutput(MutableLong accumulatedValue) + { + return accumulatedValue.getValue(); + } + + @Override + public Long getRetraction(Long value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java new file mode 100644 index 0000000..3ab6892 --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/FoldFn.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Fold Accumulation Adaptor class + */ +public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT> +{ + + public FoldFn() + { + } + + public FoldFn(OUTPUT initialVal) + { + this.initialVal = initialVal; + } + + private OUTPUT initialVal; + + @Override + public OUTPUT defaultAccumulatedValue() + { + return initialVal; + } + + @Override + public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input) + { + return fold(accumulatedValue, input); + } + + @Override + public OUTPUT getOutput(OUTPUT accumulatedValue) + { + return accumulatedValue; + } + + @Override + public OUTPUT getRetraction(OUTPUT value) + { + return null; + } + + abstract OUTPUT fold(OUTPUT result, INPUT input); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java new file mode 100644 index 0000000..b4507bc --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/ReduceFn.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api.impl.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * An easy to use reduce Accumulation + * @param <INPUT> + */ +public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT> +{ + @Override + public INPUT defaultAccumulatedValue() + { + return null; + } + + @Override + public INPUT accumulate(INPUT accumulatedValue, INPUT input) + { + if (accumulatedValue == null) { + return input; + } + return reduce(accumulatedValue, input); + } + + @Override + public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2) + { + return reduce(accumulatedValue1, accumulatedValue2); + } + + @Override + public INPUT getOutput(INPUT accumulatedValue) + { + return accumulatedValue; + } + + @Override + public INPUT getRetraction(INPUT value) + { + return null; + } + + public abstract INPUT reduce(INPUT input1, INPUT input2); + + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java new file mode 100644 index 0000000..77a08a6 --- /dev/null +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/accumulation/TopN.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.api.impl.accumulation; + +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * TopN accumulation + */ +public class TopN<T> implements Accumulation<T, List<T>, List<T>> +{ + + int n; + + Comparator<T> comparator; + + public void setN(int n) + { + this.n = n; + } + + public void setComparator(Comparator<T> comparator) + { + this.comparator = comparator; + } + + @Override + public List<T> defaultAccumulatedValue() + { + return new LinkedList<>(); + } + + @Override + public List<T> accumulate(List<T> accumulatedValue, T input) + { + int k = 0; + for (T inMemory : accumulatedValue) { + if (comparator != null) { + if (comparator.compare(inMemory, input) < 0) { + break; + } + } else if (input instanceof Comparable) { + if (((Comparable<T>)input).compareTo(inMemory) > 0) { + break; + } + } else { + throw new RuntimeException("Tuple cannot be compared"); + } + k++; + } + accumulatedValue.add(k, input); + if (accumulatedValue.size() > n) { + accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1)); + } + return accumulatedValue; + } + + @Override + public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2) + { + accumulatedValue1.addAll(accumulatedValue2); + if (comparator != null) { + Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator)); + } else { + Collections.sort(accumulatedValue1, Collections.reverseOrder()); + } + if (accumulatedValue1.size() > n) { + return accumulatedValue1.subList(0, n); + } else { + return accumulatedValue1; + } + } + + @Override + public List<T> getOutput(List<T> accumulatedValue) + { + return accumulatedValue; + } + + @Override + public List<T> getRetraction(List<T> accumulatedValue) + { + return new LinkedList<>(); + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java index cc85f37..b0fe3c5 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/AnnonymousClassModifier.java @@ -18,6 +18,8 @@ */ package org.apache.apex.malhar.stream.api.operator; +import org.apache.hadoop.classification.InterfaceStability; + import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.AnnotationVisitor; import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Attribute; import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassVisitor; @@ -33,6 +35,7 @@ import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes; * * @since 3.4.0 */ [email protected] public class AnnonymousClassModifier extends ClassVisitor { private String className; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java index 9194dc2..05a791c 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/ByteArrayClassLoader.java @@ -21,9 +21,12 @@ package org.apache.apex.malhar.stream.api.operator; import java.util.Map; +import org.apache.hadoop.classification.InterfaceStability; + /** * @since 3.4.0 */ [email protected] public class ByteArrayClassLoader extends ClassLoader { private final Map<String, byte[]> classes; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java index 0a8ba55..1e2066c 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/operator/FunctionOperator.java @@ -26,10 +26,11 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import javax.validation.constraints.NotNull; - +import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.util.TupleUtil; import org.apache.commons.io.IOUtils; +import org.apache.hadoop.classification.InterfaceStability; import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassReader; import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.ClassWriter; @@ -39,12 +40,15 @@ import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; /** * Operators that wrap the functions * * @since 3.4.0 */ [email protected] public class FunctionOperator<OUT, FUNCTION extends Function> implements Operator { private byte[] annonymousFunctionClass; @@ -57,8 +61,12 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato protected boolean isAnnonymous = false; + @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<OUT> output = new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Tuple<OUT>> tupleOutput = new DefaultOutputPort<>(); + public FunctionOperator(FUNCTION f) { isAnnonymous = f.getClass().isAnonymousClass(); @@ -245,6 +253,7 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato } + @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() { @Override @@ -255,6 +264,21 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato } }; + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>() + { + @Override + public void process(Tuple<IN> t) + { + Function.MapFunction<IN, OUT> f = getFunction(); + if (t instanceof Tuple.PlainTuple) { + TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, f.f(t.getValue())); + } else { + output.emit(f.f(t.getValue())); + } + } + }; + public MapFunctionOperator(Function.MapFunction<IN, OUT> f) { super(f); @@ -269,6 +293,8 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato } + + @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() { @Override @@ -281,93 +307,65 @@ public class FunctionOperator<OUT, FUNCTION extends Function> implements Operato } }; - public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f) - { - super(f); - } - } - - public static class FoldFunctionOperator<IN, OUT> extends FunctionOperator<OUT, Function.FoldFunction<IN, OUT>> - { - - public FoldFunctionOperator() - { - - } - - @NotNull - private OUT foldVal; - - public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>() { @Override - public void process(IN t) + public void process(Tuple<IN> t) { - Function.FoldFunction<IN, OUT> f = getFunction(); - // fold the value - foldVal = f.fold(t, foldVal); - output.emit(foldVal); + Function.FlatMapFunction<IN, OUT> f = getFunction(); + if (t instanceof Tuple.PlainTuple) { + for (OUT out : f.f(t.getValue())) { + tupleOutput.emit(TupleUtil.buildOf((Tuple.PlainTuple<IN>)t, out)); + } + } else { + for (OUT out : f.f(t.getValue())) { + output.emit(out); + } + } } }; - public FoldFunctionOperator(Function.FoldFunction<IN, OUT> f, OUT initialVal) + public FlatMapFunctionOperator(Function.FlatMapFunction<IN, OUT> f) { super(f); - this.foldVal = initialVal; } } - public static class ReduceFunctionOperator<IN> extends FunctionOperator<IN, Function.ReduceFunction<IN>> + + public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>> { - public ReduceFunctionOperator() + public FilterFunctionOperator() { } - @NotNull - private IN reducedVal; - + @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() { @Override public void process(IN t) { - Function.ReduceFunction<IN> f = getFunction(); + Function.FilterFunction<IN> f = getFunction(); // fold the value - if (reducedVal == null) { - reducedVal = t; - return; + if (f.f(t)) { + output.emit(t); } - reducedVal = f.reduce(t, reducedVal); - output.emit(reducedVal); } }; - public ReduceFunctionOperator(Function.ReduceFunction<IN> f) - { - super(f); - } - } - - public static class FilterFunctionOperator<IN> extends FunctionOperator<IN, Function.FilterFunction<IN>> - { - - public FilterFunctionOperator() - { - - } - - public final transient DefaultInputPort<IN> input = new DefaultInputPort<IN>() + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<Tuple<IN>> tupleInput = new DefaultInputPort<Tuple<IN>>() { @Override - public void process(IN t) + public void process(Tuple<IN> t) { Function.FilterFunction<IN> f = getFunction(); - // fold the value - if (f.f(t)) { - output.emit(t); + if (f.f(t.getValue())) { + tupleOutput.emit(t); } + } }; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java deleted file mode 100644 index 3641189..0000000 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/KeyedTuple.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.stream.api.util; - -/** - * An interface indicate a tuple with a specific key - * It is used internally to identify the key from the tuple - * - * @since 3.4.0 - */ -public interface KeyedTuple<K> -{ - /** - * Return the key of the tuple - * @return - */ - K getKey(); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java index 583615a..04f42b3 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/util/TupleUtil.java @@ -18,6 +18,11 @@ */ package org.apache.apex.malhar.stream.api.util; +import java.util.List; + +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.Window; + /** * The tuple util will be used to extract fields that are used as key or value<br> * Or converting from data tuples to display tuples <br> @@ -29,8 +34,22 @@ package org.apache.apex.malhar.stream.api.util; public class TupleUtil { - public static interface NONE + public static <T, O> Tuple.PlainTuple<O> buildOf(Tuple.PlainTuple<T> t, O newValue) { + if (t instanceof Tuple.WindowedTuple) { + Tuple.WindowedTuple<O> newT = new Tuple.WindowedTuple<>(); + List<Window> wins = ((Tuple.WindowedTuple)t).getWindows(); + for (Window w : wins) { + newT.addWindow(w); + } + newT.setValue(newValue); + ((Tuple.WindowedTuple)t).setTimestamp(((Tuple.WindowedTuple)t).getTimestamp()); + return newT; + } else if (t instanceof Tuple.TimestampedTuple) { + return new Tuple.TimestampedTuple<>(((Tuple.TimestampedTuple)t).getTimestamp(), newValue); + } else { + return new Tuple.PlainTuple<>(newValue); + } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java index 34820b6..9d03f2a 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java @@ -269,7 +269,7 @@ public class FunctionOperatorTest = new FunctionOperator.FilterFunctionOperator<Integer>(new Function.FilterFunction<Integer>() { @Override - public Boolean f(Integer in) + public boolean f(Integer in) { return in % divider == 0; } @@ -309,7 +309,7 @@ public class FunctionOperatorTest .filter(new Function.FilterFunction<Integer>() { @Override - public Boolean f(Integer in) + public boolean f(Integer in) { return in % divider == 0; } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java index 71b9a82..99d5ca6 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java @@ -33,6 +33,8 @@ import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.stram.plan.logical.LogicalPlan; +import static org.apache.apex.malhar.stream.api.Option.Options.name; + /** * Unit test to default implementation of ApexStream interface */ @@ -45,8 +47,8 @@ public class ApexStreamImplTest LogicalPlan dag = new LogicalPlan(); TestOperator<String, Integer> firstOperator = new TestOperator<>(); TestOperator<Integer, Date> secondOperator = new TestOperator<>(); - new ApexStreamImpl<String>().addOperator("first", firstOperator, null, firstOperator.output) - .addOperator("second", secondOperator, secondOperator.input, null) + new ApexStreamImpl<String>().addOperator(firstOperator, null, firstOperator.output, name("first")) + .endWith(secondOperator, secondOperator.input, name("second")) .with(DAG.Locality.THREAD_LOCAL) .with(Context.OperatorContext.AUTO_RECORD, true) .with("prop", "TestProp").populateDag(dag); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java index 44f76b1..f65806e 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/ApplicationWithStreamAPI.java @@ -20,6 +20,11 @@ package org.apache.apex.malhar.stream.sample; import java.util.Arrays; +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; @@ -28,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.util.KeyValPair; /** * An application example with stream api @@ -40,18 +46,27 @@ public class ApplicationWithStreamAPI implements StreamingApplication public void populateDAG(DAG dag, Configuration configuration) { String localFolder = "./src/test/resources/data"; - ApexStream stream = StreamFactory + ApexStream<String> stream = StreamFactory .fromFolder(localFolder) .flatMap(new Function.FlatMapFunction<String, String>() { @Override public Iterable<String> f(String input) { - return Arrays.asList(input.split(" ")); + return Arrays.asList(input.split("[\\p{Punct}\\s]+")); } }); stream.print(); - stream.countByKey().print(); + stream.window(new WindowOption.GlobalWindow(), new TriggerOption().withEarlyFiringsAtEvery(Duration + .millis(1000)).accumulatingFiredPanes()).countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple(new KeyValPair<>(input, 1L)); + } + }).print(); stream.populateDag(dag); + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java index d679135..f46fb14 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/LocalTestWithoutStreamApplication.java @@ -27,9 +27,13 @@ import java.util.concurrent.Callable; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import com.datatorrent.lib.util.KeyValPair; /** * A embedded application test without creating Streaming Application @@ -41,25 +45,32 @@ public class LocalTestWithoutStreamApplication public void testNonStreamApplicationWordcount() throws Exception { - TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>(); + TupleCollector<Tuple.WindowedTuple<KeyValPair<String, Long>>> collector = new TupleCollector<>(); collector.id = "testNonStreamApplicationWordcount"; - final Map<Object, Integer> expected = new HashMap<>(); - expected.put("error", 2); - expected.put("word1", 4); - expected.put("word2", 8); - expected.put("word3", 4); - expected.put("word4", 4); - expected.put("word5", 4); - expected.put("word7", 4); - expected.put("word9", 6); + final Map<String, Long> expected = new HashMap<>(); + expected.put("error", 2L); + expected.put("word1", 4L); + expected.put("word2", 8L); + expected.put("word3", 4L); + expected.put("word4", 4L); + expected.put("word5", 4L); + expected.put("word7", 4L); + expected.put("word9", 6L); Callable<Boolean> exitCondition = new Callable<Boolean>() { @Override public Boolean call() throws Exception { - List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount"); - return (data != null) && data.size() >= 1 && expected.equals(data.get(data.size() - 1)); + if (!TupleCollector.results.containsKey("testNonStreamApplicationWordcount") || TupleCollector.results.get("testNonStreamApplicationWordcount").isEmpty()) { + return false; + } + Map<String, Long> data = new HashMap<>(); + for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : + (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get("testNonStreamApplicationWordcount")) { + data.put(entry.getValue().getKey(), entry.getValue().getValue()); + } + return data.size() >= 8 && expected.equals(data); } }; @@ -73,13 +84,26 @@ public class LocalTestWithoutStreamApplication return Arrays.asList(input.split(" ")); } }) - .countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition); + .window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(1)) + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple(new KeyValPair<>(input, 1L)); + } + }).addOperator(collector, collector.inputPort, null).runEmbedded(false, 30000, exitCondition); + Map<String, Long> data = new HashMap<>(); - List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get("testNonStreamApplicationWordcount"); + for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : + (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get("testNonStreamApplicationWordcount")) { + data.put(entry.getValue().getKey(), entry.getValue().getValue()); + } + //Thread.sleep(100000); Assert.assertNotNull(data); Assert.assertTrue(data.size() > 1); - Assert.assertEquals(expected, data.get(data.size() - 1)); + Assert.assertEquals(expected, data); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java index 4958a8e..20d7aed 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStream.java @@ -18,7 +18,6 @@ */ package org.apache.apex.malhar.stream.sample; -import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl; @@ -30,7 +29,7 @@ import com.datatorrent.api.DAG; public class MyStream<T> extends ApexStreamImpl<T> { - public MyStream(ApexStream<T> apexStream) + public MyStream(ApexStreamImpl<T> apexStream) { super(apexStream); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java index b2d1e8b..5e48974 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/MyStreamTest.java @@ -24,37 +24,51 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import org.joda.time.Duration; import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.ApexStreamImpl; import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import com.datatorrent.lib.util.KeyValPair; + /** * A test class which test your own stream implementation build on default one */ @SuppressWarnings("unchecked") public class MyStreamTest { - static Map<Object, Integer> expected = new HashMap<>(); + static Map<String, Long> expected = new HashMap<>(); static String testId = null; static Callable<Boolean> exitCondition = null; static { - expected.put("newword1", 4); - expected.put("newword2", 8); - expected.put("newword3", 4); - expected.put("newword4", 4); - expected.put("newword5", 4); - expected.put("newword7", 4); - expected.put("newword9", 6); + expected.put("newword1", 4L); + expected.put("newword2", 8L); + expected.put("newword3", 4L); + expected.put("newword4", 4L); + expected.put("newword5", 4L); + expected.put("newword7", 4L); + expected.put("newword9", 6L); exitCondition = new Callable<Boolean>() { @Override public Boolean call() throws Exception { - List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId); - return (data != null) && data.size() >= 1 && expected.equals(data.get(data.size() - 1)); + if (!TupleCollector.results.containsKey(testId) || TupleCollector.results.get(testId).isEmpty()) { + return false; + } + Map<String, Long> dataMap = new HashMap<>(); + List<Tuple.TimestampedTuple<KeyValPair<String, Long>>> data = (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get(testId); + for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : data) { + dataMap.put(entry.getValue().getKey(), entry.getValue().getValue()); + } + return (dataMap != null) && dataMap.size() >= 1 && expected.equals(dataMap); } }; } @@ -65,9 +79,9 @@ public class MyStreamTest testId = "testMethodChainWordcount"; - TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>(); + TupleCollector<Tuple.WindowedTuple<KeyValPair<String, Long>>> collector = new TupleCollector<>(); collector.id = testId; - new MyStream<>(StreamFactory.fromFolder("./src/test/resources/data")) + new MyStream<>((ApexStreamImpl<String>)StreamFactory.fromFolder("./src/test/resources/data")) .<String, MyStream<String>>flatMap(new Function.FlatMapFunction<String, String>() { @Override @@ -85,17 +99,28 @@ public class MyStreamTest }, new Function.FilterFunction<String>() { @Override - public Boolean f(String input) + public boolean f(String input) { return input.startsWith("word"); } - }).countByKey() - .addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition); + }).window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.millis(1000))) + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple(new KeyValPair<>(input, 1L)); + } + }).addOperator(collector, collector.inputPort, null) + .runEmbedded(false, 30000, exitCondition); + Map<String, Long> dataMap = new HashMap<>(); + for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get(testId)) { + dataMap.put(entry.getValue().getKey(), entry.getValue().getValue()); + } - List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId); - Assert.assertTrue(data.size() > 1); - Assert.assertEquals(expected, data.get(data.size() - 1)); + Assert.assertTrue(dataMap.size() > 1); + Assert.assertEquals(expected, dataMap); } @Test @@ -103,9 +128,9 @@ public class MyStreamTest { testId = "testNonMethodChainWordcount"; - TupleCollector<Map<Object, Integer>> collector = new TupleCollector<>(); + TupleCollector<Tuple.WindowedTuple<KeyValPair<String, Long>>> collector = new TupleCollector<>(); collector.id = testId; - MyStream<String> mystream = new MyStream<>(StreamFactory + MyStream<String> mystream = new MyStream<>((ApexStreamImpl<String>)StreamFactory .fromFolder("./src/test/resources/data")) .flatMap(new Function.FlatMapFunction<String, String>() { @@ -125,16 +150,28 @@ public class MyStreamTest }, new Function.FilterFunction<String>() { @Override - public Boolean f(String input) + public boolean f(String input) { return input.startsWith("word"); } - }).countByKey().addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition); + }).window(new WindowOption.GlobalWindow(), new TriggerOption().accumulatingFiredPanes().withEarlyFiringsAtEvery(Duration.millis(1000))) + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple(new KeyValPair<>(input, 1L)); + } + }).addOperator(collector, collector.inputPort, collector.outputPort).print().runEmbedded(false, 30000, exitCondition); + + Map<String, Long> dataMap = new HashMap<>(); + for (Tuple.TimestampedTuple<KeyValPair<String, Long>> entry : (List<Tuple.TimestampedTuple<KeyValPair<String, Long>>>)TupleCollector.results.get(testId)) { + dataMap.put(entry.getValue().getKey(), entry.getValue().getValue()); + } - List<Map<Object, Integer>> data = (List<Map<Object, Integer>>)TupleCollector.results.get(testId); - Assert.assertTrue(data.size() > 1); - Assert.assertEquals(expected, data.get(data.size() - 1)); + Assert.assertTrue(dataMap.size() > 1); + Assert.assertEquals(expected, dataMap); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java index a5c644d..94667c9 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/TupleCollector.java @@ -25,6 +25,7 @@ import java.util.Map; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.common.util.BaseOperator; /** @@ -37,6 +38,7 @@ public class TupleCollector<T> extends BaseOperator public final transient CollectorInputPort<T> inputPort = new CollectorInputPort<>(this); + @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<>(); public String id = ""; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java new file mode 100644 index 0000000..14ff066 --- /dev/null +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WCInput.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +public class WCInput extends BaseOperator implements InputOperator +{ + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<>(); + + private transient BufferedReader reader; + + @Override + public void setup(Context.OperatorContext context) + { + initReader(); + } + + private void initReader() + { + try { + Path myPath = new Path("/user/siyuan/wc/wordcount"); + FileSystem fs = FileSystem.get(new Configuration()); + reader = new BufferedReader(new InputStreamReader(fs.open(myPath))); + } catch (Exception ex) { + throw Throwables.propagate(ex); + } + } + + @Override + public void teardown() + { + IOUtils.closeQuietly(reader); + } + + @Override + public void emitTuples() + { + try { + String line = reader.readLine(); + if (line == null) { + reader.close(); + initReader(); + } else { + // simulate late data + //long timestamp = System.currentTimeMillis() - (long)(Math.random() * 30000); + + this.output.emit(line); + } + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + + @Override + public void endWindow() + { + //this.controlOutput.emit(new WatermarkImpl(System.currentTimeMillis() - 15000)); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java new file mode 100644 index 0000000..11dabe4 --- /dev/null +++ b/stream/src/test/java/org/apache/apex/malhar/stream/sample/WordCountWithStreamAPI.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.stream.sample; + +import java.util.Arrays; + +import org.joda.time.Duration; + +import org.apache.apex.malhar.lib.window.TriggerOption; +import org.apache.apex.malhar.lib.window.Tuple; +import org.apache.apex.malhar.lib.window.WindowOption; +import org.apache.apex.malhar.stream.api.ApexStream; +import org.apache.apex.malhar.stream.api.function.Function; +import org.apache.apex.malhar.stream.api.impl.StreamFactory; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Word count with streaming API + */ +@ApplicationAnnotation(name = "WCDemo") +public class WordCountWithStreamAPI implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration configuration) + { + WCInput wcInput = new WCInput(); + ApexStream<String> stream = StreamFactory + .fromInput(wcInput, wcInput.output) + .flatMap(new Function.FlatMapFunction<String, String>() + { + @Override + public Iterable<String> f(String input) + { + return Arrays.asList(input.split("[\\p{Punct}\\s]+")); + } + }); + stream.print(); + stream.window(new WindowOption.GlobalWindow(), new TriggerOption().withEarlyFiringsAtEvery(Duration + .millis(1000)).accumulatingFiredPanes()) + .countByKey(new Function.ToKeyValue<String, String, Long>() + { + @Override + public Tuple<KeyValPair<String, Long>> f(String input) + { + return new Tuple.PlainTuple(new KeyValPair<>(input, 1L)); + } + }).print(); + stream.populateDag(dag); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/266b0411/stream/src/test/resources/sampletweets.txt ---------------------------------------------------------------------- diff --git a/stream/src/test/resources/sampletweets.txt b/stream/src/test/resources/sampletweets.txt new file mode 100644 index 0000000..379d424 --- /dev/null +++ b/stream/src/test/resources/sampletweets.txt @@ -0,0 +1,207 @@ +Tweet content +"Apple has $233 billion in cash. It could buy all + +â@NFL teams +â@NBA teams +â@MLB teams +â@NHL teams + +...and still have $80 billion left. $AAPL" +Read $MRNJ #NEWS, $HEMP & $GRCU r above .01, that's where we r goinð $SPY $MSFT $SBUX $SFOR $VRX $AAPL $TSLA $GOOG $FB $EURUSD $USDJPY $MLCG +$RXSF is leadin their sector accordin 2 @EdisonMediaCen $AAPL $SPY $TSLA $FB $EURUSD $ALK $IBB $EW $AMZN $GBPUSD $GM https://t.co/LYY2mHn755 +Philstockworld Top Trade Review $AAPL $MSFT #Dividends $USO $HOV $TWTR -- https://t.co/JArXsIm7CI https://t.co/kRR9ezhm9E +Philstockworld Top Trade Review: $AAPL $ABX $BA $CAKE $CMG $DIS $IBM $GILD $LL $UNG $SPY -- https://t.co/EX5SYjdwBC https://t.co/7FBZwVZ63v +"Mondayâs Oil Mess: Rent-A-Rebel Jacks up Prices into the Holiday $USO $AAPL +#Earnings -- https://t.co/cGHB3WDKA8 https://t.co/JFZIBcom1n" +Meaningless Monday Market Movement! $AAPL $SQQQ #oil #Brexit https://t.co/j4Iqg7E1HN +"S&P Futures Back over 2,050, for Now +$SPY $AAPL $SQQQ #China #Debt #Hedging -- https://t.co/2dOc5T89S3 https://t.co/TDPVdNRNQF" +"ð¥TURN YOUR $500 INTO $5,000+ð¥ + +JOIN #TEAMBILLIONAIRE⤵ +ð§ [email protected] + +#PENNYSTOCKS $AAPL $GSAT $MGT +https://t.co/lwAGjfmIP3" +Trendless Tuesday - Watch Yesterdayâs Fake Gains Disappear $AAPL #China $FXI #Earnings -- https://t.co/GpgGqoOlFn https://t.co/FRuixv5aZF +"ð¥TURN YOUR $500 INTO $5,000+ð¥ + +JOIN #TEAMBILLIONAIRE⤵ +ð§ [email protected] + +#PENNYSTOCKS $AAPL $UVXY $JDST +https://t.co/lwAGjfmIP3" +"Apple has $233 billion in cash. It could buy: + +Uber +Tesla +Twitter +Airbnb +Netflix +Yahoo + +...and still have $18 billion left. $AAPL" +Option Opportunity Portfolio May Review â Up 19.3% In 30 Days! $ABX $FCX $USO $AAPL $DIS - https://t.co/rp3kMsRZ3E https://t.co/TKkc15pKcR +Waiting for the Fed â Apple Gives Us Huge Wins: $AAPL $SQQQ #GDP #Nikkei #Futures #Oil -- https://t.co/Al3pkf350V https://t.co/LktIRF4F2b +Tempting Tuesday - S&P 2,100 is Still the Line to Watch Ahead of the Fed $AAPL $QQQ -- https://t.co/t1eDfKHJnk https://t.co/BAW3RAe7SC +Our $SQQQ Hedge is Up 314% and Our Futures Are Up $4,850, You're Welcome! $AAPL -- https://t.co/eUQ2kCkCOY https://t.co/Yk98oyqMZl +"TURN YOUR ð²500 INTO ð²5,000$ð¥ + +JOIN #TEAMBILLIONAIRE ⤵ +ð§ [email protected] + +#PENNYSTOCKS $TWTR $AAPL $LNKD +https://t.co/euJFNQX1g4" +"TURN YOUR ð²500 INTO ð²5,000$ð¥ + +JOIN #TEAMBILLIONAIRE ⤵ +ð§ [email protected] + +#PENNYSTOCKS $TALK $PPPI $AAPL https://t.co/oSn11kxftM" +Bears today. We getting paid! $AAPL $TWTR $BWLD $NFLX https://t.co/CCi0S3skJJ +"Apple has $233 billion in cash. It could buy all + +â@NFL teams +â@NBA teams +â@MLB teams +â@NHL teams + +...and still have $80 billion left. $AAPL" +Are you in Sync with the market? https://t.co/ZtHHCrSAf8 #stocks #finance #investing #trading $AAPL $LNKD $NFLX $GOOGL $FB +The Last Time These Insiders Purchased This Stock It Sky Rocketed 1000%+ https://t.co/bmNAHBoQBD $DIA $QQQ $SPY $GOOG $AAPL $BAC $TWTR $FB +"This Hacker Made Amazonâs Alexa, Google Now, and Siri Command One Another +https://t.co/YXP3yqmf4H $AAPL $AMZN $GOOG https://t.co/NG7r6qgfRt" +"Over the last 3 years, the top 14 automakers upped their combined R&D spend by $192 million. + +$AAPL upped R&D spend by $5 billion. + +- MS" +Volatility can be your friend. https://t.co/aHz2r8HHD2 #stocks #trading #investing #financials #learntodaytrade $FB $AAPL $LNKD $NFLX +"PERCENTAGE of Apple's Revenues: +FY 2006: +iPod 40% +Mac 38% +Services 10% +Others 12% + +FY 2015: +iPhone 66% +Mac 11% +iPad 10% +Others 13% + +$AAPL" +Apple recovered $40 million worth of gold from recycled iPhones, iPads & Macs in 2015. https://t.co/XPBWlM6cBs $AAPL https://t.co/P0LMSRw7Ot +"Apple's iPhone sales sink for 1st time ever last quarter +https://t.co/TAKjUwl4Yc @DavidGoldmanCNN @cnntech $AAPL https://t.co/OrDp4BDpsD" +$BAC is down 5% since our article was posted on Friday https://t.co/al8AgaSsiI $DIA $QQQ $SPY $AAPL $GOOG $FB $TWTR $STUDY $NFLX $LNKD $IBM +Ben Franklin: The First Proponent Of Dividend Growth Investing? https://t.co/dx7FE2G9AH $AAPL $ACN $AL $BEN $CSV $HON $IJR $JNJ $JWN $PEGI +$5,000 Friday the 13th - Yesterday's Futures Trades Pay Off Nicely $USO $SPY $AAPL -- https://t.co/3RUEjAq1bO https://t.co/2L7cdebTlT +I DON'T SEE ANY BUBBLE RIGHT NOW , I SWEAR ! $SPX $SPY $DIA $DJI $AAPL $VIX $TVIX $C $BAC $GM $GE $FB #STOCKMARKET https://t.co/E5954RIpC7 +Terrible $AAPL quarter, finally. On the way to becoming $NOK. Tech is mean reverting, today's leaders are almost always tomorrow's laggards. +The iPhone 7S could look radically different from the iPhones of today https://t.co/eQxUMAZ4eM $AAPL https://t.co/HIH3QqKpIC +"No Bull: The Evidence +https://t.co/Md2SNpjdwd +$SPX $MSFT $GOOGL $AAPL $NFLX $AMZN $FB $DIS $V $BAC $GS $WMT $SBUX https://t.co/1oISHNX4cJ" +The iPhone 7S could look radically different from the iPhones of today https://t.co/KgeVSjmcGe $AAPL https://t.co/7hFtg37oJu +There was a 3rd Apple founder, Ronald Wayne, who sold his 10% stake for $800 in 1976. Today his share would've been worth $65 Billion. $AAPL +Twitter Stock Set to Breakout Soon https://t.co/u4V6ChhpOW $TWTR $DIA $QQQ $SPY $AAPL $GLD $GDX $NUGT $DUST $BAC $GOOG $FB $STUDY $NFLX $IBM +Alibaba Stock Price Breaks The 50 Day Moving Average https://t.co/ABOVWI6j2G $BABA $AAPL $YHOO $COST $UWTI $CSC $MON https://t.co/VlWGDxrQXh +I still canât shake the feeling that $AAPL is slowly taking themselves private. https://t.co/XIAMvppDWh https://t.co/kdMGCGbMaJ +$SPX ROADMAP 2016 #STOCKMARKET $INTC $F $SPY $AAPL $AMZN $C $VIX $FB $TWTR $GOOGL $UVXY $FAZ $FEZ $MSFT $GS $BAC $AA https://t.co/owuQ9awcDw +"Want to know why $GOOG is so impressive and why $AAPL is so fucked? Read this years founders' letter from $GOOG: + +https://t.co/LiBjGZwyKw" +"GET READY. Here are the companies reporting earnings next week: https://t.co/NXptPkQX70 + +$AAPL $FB $TWTR $CMG $GILD https://t.co/tcIoCZdOZi" +$SPX THIS TIME IT'S DIFFERENT! $SPY $DIA $SDOW $S $FAZ $FEZ $AAPL $MSFT $BAC $C $JPM $GS $SIRI $AMZN $F $VIX $TVIX https://t.co/pkYVgNKv3P +$SPX ROADMAP 2016 #STOCKS $TVIX $VXX $VALE $AAPL $AKS $FCX $MSFT $AA $MU $VIX $SPX $SPY #TRADING $PCLN $SIRI $ MCD https://t.co/6UH5He38h1 +The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/s8iQOvPQeR $AAPL https://t.co/QmQROtQ9vY +11/ For example, buy an Echo and see your behavior change. The future is happening, and $AAPL seems, to me, asleep. +$RLYP $SPY $KORS $WDAY $MSFT $AAPL $QLIK $TIVO $NXPI $CPXX $AVGO $ZOES $LE $TICC $SLB $FCEL $VRA $MLNX $ASNA $ICPT https://t.co/LXMpz4rFG0 +#STOCKMARKET GRAVITY LESSONS: what goes up must come down $SPX $SPY $DIA $QQQ $TVIX $VIX $AAPL $C $FB $PCLN $BAC $F https://t.co/8HQHBEgSj5 +Should Icahn's exit or Buffett's entry affect your $AAPL judgment? The Big Name Effect. https://t.co/9Z2ok61MUh https://t.co/udAQLfdJFe +Apple revenue drops 13 percent, ending 13 years of growth. Greater China was especially weak, down 26 percent. $AAPL https://t.co/q4ovXUenBU +It was a $18 billion day for Apple. https://t.co/iRbGeoTmCJ $AAPL +"Apple has $233 billion in cash. It could buy: + +Uber +Tesla +Twitter +Airbnb +Netflix +Yahoo + +...and still have $18 billion left. $AAPL" +#3 TOP 2111.05 #STOCKS #STOCKMARKET #TRADING $SPX $SPY $VIX $TVIX $AAPL $SIRI $C $BAC $JPM $AMZN $MSFT $FB $TWTR $F https://t.co/gSqmN0fVON +Google #IO16: Android's failure to innovate hands a Apple free run at WWDC $GOOG $AAPL https://t.co/FTs9M8JD5g https://t.co/20uou1gUkW +$SPX 2134.72..2116.48...2111.05 HOUSTON WE HAVE A PROBLEM ! #STOCKMARKET $VIX $SPY $DIA $AAPL $C $BAC $FB $VXX $MSFT https://t.co/du3QfPUM4Q +top #earnings $FB $AAPL $AMZN $TWTR $CMG $F $GILD $LNKD $FCX $CELG $SWKS $JBLU $T $NXPI $BA https://t.co/lObOE0uRjZ https://t.co/94F6GJc3hE +The iPhone 6S is the first iPhone ever to sell fewer models than its predecessor https://t.co/ZVeQ9a4Yrh $AAPL https://t.co/2Ntpbxwlyo +You do not want to miss this incredibly candid look into $AAPL w/ @tim_cook! Tune into @MadMoneyOnCNBC on @CNBC now! https://t.co/budv4qfvju +Foxconn axes 60,000 jobs in one Chinese factory as robots take over: https://t.co/BnFdjGCmLf $AAPL https://t.co/WhRHer8jdN +Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/nXmvK6PV7M https://t.co/MAcMz0iTg6 +Apple is about to report its worst quarter in 13 years on Tuesday https://t.co/NJ3hwunHCx $AAPL https://t.co/YLTmnpqNjI +Everyone who wants an iPhone has one. $AAPL is now a consumer staple stock and will trade on replacement / shareholder yield. +Financial Armageddonâ is imminent, the next major crash will happen in 2016 $VXX $VIX $TVIX $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F +"Apple is NO longer the largest US stock by market cap. Google is: https://t.co/i81Y83jQJC + +$GOOGL $AAPL https://t.co/cRCKRYBICS" +Exclusive: Apple hires former Tesla VP Chris Porritt for âspecial [car] projectâ https://t.co/7knsloxvJW $TSLA $AAPL https://t.co/X8cYztExoP +$SPX on the top of downtrend Channel Be careful! #STOCKMARKET $SPY $AAPL $AMZN $TSLA $FB $QQQ $DIA $NFLX $PCLN $C $F https://t.co/UKZCyLYuBq +UPDATE: Apple CEO Cook says in conference call that smartphone marker is 'currently not growing' $AAPL https://t.co/WeECmrdv1j +In February Charlie Munger was asked why Berkshire owns $GM. The $AAPL stake isn't anymore complicated than this: https://t.co/Rwkb30OEgq +Talking to @SquawkStreet about $AAPL & more at @NYSE ! https://t.co/m05b68VLMp +iPhone sales sour #Apple's earning: https://t.co/962fj9SWsc $AAPL https://t.co/nz9FRK6sNK +People arenât upgrading smartphones as quickly and that is bad for Apple https://t.co/EOEJPfNR8Z ð $AAPL +"$NXPI $JBLU $FCX $AAPL $CMG +$TWTR $EBAY $BWLD $PNRA $CRUS +$FB $FSLR $UPS $CELG $AMZN +$LNKD $BIDU $SWKS $GILD $HELE https://t.co/rQUmhHgYn0" +People mad that Icahn sold $AAPL without giving them the headâs up - How much in commissions did you pay him this year? +Cool stat: $AAPL's $46 billion loss in market cap overnight is greater than the market cap of 391 S&P 500 companies https://t.co/1ms1YZzTbP +Apple. You've come a long way... https://t.co/WGvk8K8MYv $AAPL https://t.co/3Wo0hAwRAc +"Someone is building the Internet's biggest list of stock market cliches and it's amazing: https://t.co/mIV169cF36 + +$SPY $AAPL $EURUSD" +JUST IN: Apple delays earnings release by one day, to April 26th after the bell. ⢠$AAPL +Apple's market value is down nearly three Twitters $AAPL $TWTR +Trump warns of a tech bubble: https://t.co/6Ks1yTa4Zc $AAPL $FB $AMZN He's 100% right about this. https://t.co/dJgTLk5JOB +Apple could sell its billionth iPhone in just a few months' time https://t.co/g6VYDFIE3d $AAPL https://t.co/jzucmxDYXe +$SPX KEEP BLOWING #STOCKMARKET #BUBBLE #STOCKS $MSFT $GS $AAPL $SPY $DIA $DJI $C $SIRI $PCLN $BAC $JPM $VIX $TVIX https://t.co/GPFBb0uCLF +Will Apple $AAPL fall from tree? 12-mo descending triangle. I've no interest to short it, but it will be wild ride https://t.co/AnjsIKmIHI +Tim Cook shouldn't be doing TV w/out a new product. Looks desperate. Not a consumer-facing guy. $AAPL https://t.co/Z4UFSimTLg +When will Apple will sell its billionth iPhone? It may be sooner than you think: https://t.co/5IaF018N1p $AAPL https://t.co/cCIgtKqWHA +#Stockmarket downtrend continues next week $spx $spy $vix $tvix $dji $aapl $jpm $bac $c $msft $pcln $wmt $ba https://t.co/1TTlgnKnZc +$AAPL https://t.co/AFANPYHnoq +40 years ago this month, Apple co-founder Ronald Wayne sold his 10% stake in $AAPL for $800. Current value: $61 billion. +Warren Buffett's Berkshire Hathaway reports 9.8M share stake in $AAPL https://t.co/rXWwuyIooI https://t.co/TztgKCcWWy +Apple's iBooks and iTunes Movies in China have been shut down after less than 7 months https://t.co/ZuGXZqSHma $AAPL https://t.co/1OHGC9YiUf +Possible buy on $AAPL as it drops onto it's 9 DEMA support #1Broker #Bitcoin #Blockchain https://t.co/WWssD01joh https://t.co/jOKJyG9EaJ +"Apple is down 7% after earnings. + +That's about $40 BILLION in market cap gone in 30 minutes. Poof. + +$AAPL: https://t.co/ggfmPjJjkW" +B4 CRASH 2008 - Paulson's speech:" OUR FINANCIAL SYSTEM IS STRONG" $VXX $VIX $TVIX $UVXY $SPX $SPY $AAPL $MSFT $BAC $C $FB $DJI $DIA $F +$ONCI is ready to RUN this week! #stockmarket #pennystocks #parabolic $CDNL $MGT $GOOGL $AAPL $TSLA $TWTR $ONCI https://t.co/wwqf0RNOix +Apple could sell its billionth iPhone in just a few months' time https://t.co/u2qFZ440dH $AAPL https://t.co/8cAchiZ0vC +The iPhone might radically change in 2017 $AAPL https://t.co/IXLdCfEdus https://t.co/GpdMvFZPjE +"The growth of smartphones. On one graph. + +A great share via: https://t.co/2hAJlarjSM + +$AAPL $GOOGL $MSFT https://t.co/BAwQRvYzou" +"$AAPL finished last quarter with $232 billion in cash, meanwhile Kanye running up debts making records for Tidal. + +Bro." +Which is bullish for $AAPL if you know anything about $GS https://t.co/WWssD01joh https://t.co/CQk8iKMI7w +"The tech stocks with the MOST revenue + +1. $AAPL +2. $AMZN +3. $MSFT + +Visual by @OphirGottlieb: https://t.co/GpZ5ct2z5r https://t.co/H6sNKdtBHd" +
