http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java new file mode 100644 index 0000000..31337c1 --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/OutputCollectorImpl.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.io.IOException; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.serializer.Deserializer; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.mapred.OutputCollector; +import com.datatorrent.lib.util.KeyHashValPair; + +/** + * <p>OutputCollectorImpl class.</p> + * + * @since 0.9.0 + */ +@SuppressWarnings("unchecked") +public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V> +{ + private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class); + + private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>(); + + public List<KeyHashValPair<K, V>> getList() + { + return list; + } + + private transient SerializationFactory serializationFactory; + private transient Configuration conf = null; + + public OutputCollectorImpl() + { + conf = new Configuration(); + serializationFactory = new SerializationFactory(conf); + + } + + private <T> T cloneObj(T t) throws IOException + { + Serializer<T> keySerializer; + Class<T> keyClass; + PipedInputStream pis = new PipedInputStream(); + PipedOutputStream pos = new PipedOutputStream(pis); + keyClass = (Class<T>)t.getClass(); + keySerializer = serializationFactory.getSerializer(keyClass); + keySerializer.open(pos); + keySerializer.serialize(t); + Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass); + keyDesiralizer.open(pis); + T clonedArg0 = keyDesiralizer.deserialize(null); + pos.close(); + pis.close(); + keySerializer.close(); + keyDesiralizer.close(); + return clonedArg0; + + } + + @Override + public void collect(K arg0, V arg1) throws IOException + { + if (conf == null) { + conf = new Configuration(); + serializationFactory = new SerializationFactory(conf); + } + list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1))); + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java new file mode 100644 index 0000000..cdc5ec9 --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReduceOperator.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; +import com.datatorrent.lib.util.KeyHashValPair; + +/** + * <p>ReduceOperator class.</p> + * + * @since 0.9.0 + */ +@SuppressWarnings({ "deprecation", "unused" }) +public class ReduceOperator<K1, V1, K2, V2> implements Operator +{ + private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class); + + private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass; + private transient Reducer<K1, V1, K2, V2> reduceObj; + private transient Reporter reporter; + private OutputCollector<K2, V2> outputCollector; + private String configFile; + + public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass() + { + return reduceClass; + } + + public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass) + { + this.reduceClass = reduceClass; + } + + public String getConfigFile() + { + return configFile; + } + + public void setConfigFile(String configFile) + { + this.configFile = configFile; + } + + private int numberOfMappersRunning = -1; + private int operatorId; + + public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>() + { + @Override + public void process(KeyHashValPair<Integer, Integer> tuple) + { + logger.info("processing {}", tuple); + if (numberOfMappersRunning == -1) { + numberOfMappersRunning = tuple.getValue(); + } else { + numberOfMappersRunning += tuple.getValue(); + } + + } + + }; + + public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>(); + private Map<K1, List<V1>> cacheObject; + public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>() + { + @Override + public void process(KeyHashValPair<K1, V1> tuple) + { + // logger.info("processing tupple {}",tuple); + List<V1> list = cacheObject.get(tuple.getKey()); + if (list == null) { + list = new ArrayList<V1>(); + list.add(tuple.getValue()); + cacheObject.put(tuple.getKey(), list); + } else { + list.add(tuple.getValue()); + } + } + + }; + + @Override + public void setup(OperatorContext context) + { + reporter = new ReporterImpl(ReporterImpl.ReporterType.Reducer, new Counters()); + if (context != null) { + operatorId = context.getId(); + } + cacheObject = new HashMap<K1, List<V1>>(); + outputCollector = new OutputCollectorImpl<K2, V2>(); + if (reduceClass != null) { + try { + reduceObj = reduceClass.newInstance(); + } catch (Exception e) { + logger.info("can't instantiate object {}", e.getMessage()); + throw new RuntimeException(e); + } + Configuration conf = new Configuration(); + InputStream stream = null; + if (configFile != null && configFile.length() > 0) { + logger.info("system /{}", configFile); + stream = ClassLoader.getSystemResourceAsStream("/" + configFile); + if (stream == null) { + logger.info("system {}", configFile); + stream = ClassLoader.getSystemResourceAsStream(configFile); + } + } + if (stream != null) { + logger.info("found our stream... so adding it"); + conf.addResource(stream); + } + reduceObj.configure(new JobConf(conf)); + } + + } + + @Override + public void teardown() + { + + } + + @Override + public void beginWindow(long windowId) + { + + } + + @Override + public void endWindow() + { + if (numberOfMappersRunning == 0) { + for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) { + try { + reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter); + } catch (IOException e1) { + logger.info(e1.getMessage()); + throw new RuntimeException(e1); + } + } + List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList(); + for (KeyHashValPair<K2, V2> e : list) { + output.emit(e); + } + list.clear(); + cacheObject.clear(); + numberOfMappersRunning = -1; + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java new file mode 100644 index 0000000..cfbb26e --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/ReporterImpl.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.Counters.Counter; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.Reporter; + +/** + * <p>ReporterImpl class.</p> + * + * @since 0.9.0 + */ +public class ReporterImpl implements Reporter +{ + private Counters counters; + InputSplit inputSplit; + + public enum ReporterType + { + Mapper, Reducer + } + + private ReporterType typ; + + public ReporterImpl(final ReporterType kind, final Counters ctrs) + { + this.typ = kind; + this.counters = ctrs; + } + + @Override + public InputSplit getInputSplit() + { + if (typ == ReporterType.Reducer) { + throw new UnsupportedOperationException("Reducer cannot call getInputSplit()"); + } else { + return inputSplit; + } + } + + public void setInputSplit(InputSplit inputSplit) + { + this.inputSplit = inputSplit; + } + + @Override + public void incrCounter(Enum<?> key, long amount) + { + if (null != counters) { + counters.incrCounter(key, amount); + } + } + + @Override + public void incrCounter(String group, String counter, long amount) + { + if (null != counters) { + counters.incrCounter(group, counter, amount); + } + } + + @Override + public void setStatus(String status) + { + // do nothing. + } + + @Override + public void progress() + { + // do nothing. + } + + @Override + public Counter getCounter(String group, String name) + { + Counters.Counter counter = null; + if (counters != null) { + counter = counters.findCounter(group, name); + } + + return counter; + } + + @Override + public Counter getCounter(Enum<?> key) + { + Counters.Counter counter = null; + if (counters != null) { + counter = counters.findCounter(key); + } + + return counter; + } + + public float getProgress() + { + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java new file mode 100644 index 0000000..31ce3a9 --- /dev/null +++ b/examples/mroperator/src/main/java/org/apache/apex/examples/mroperator/WordCount.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.io.IOException; +import java.util.Iterator; +import java.util.StringTokenizer; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.FileOutputFormat; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.Mapper; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + +/** + * <p>WordCount class.</p> + * + * @since 0.9.0 + */ +@SuppressWarnings("deprecation") +public class WordCount +{ + + public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> + { + private static final IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException + { + String line = value.toString(); + StringTokenizer tokenizer = new StringTokenizer(line); + while (tokenizer.hasMoreTokens()) { + word.set(tokenizer.nextToken()); + output.collect(word, one); + } + } + } + + public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> + { + public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException + { + int sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + output.collect(key, new IntWritable(sum)); + } + } + + public void run(String[] args) throws Exception + { + + JobConf conf = new JobConf(this.getClass()); + conf.setJobName("wordcount"); + + conf.setOutputKeyClass(Text.class); + conf.setOutputValueClass(IntWritable.class); + + conf.setMapperClass(Map.class); + conf.setCombinerClass(Reduce.class); + conf.setReducerClass(Reduce.class); + + conf.setInputFormat(TextInputFormat.class); + conf.setOutputFormat(TextOutputFormat.class); + + FileInputFormat.setInputPaths(conf, new Path(args[0])); + FileOutputFormat.setOutputPath(conf, new Path(args[1])); + + JobClient.runJob(conf); + } + + public static void main(String[] args) throws Exception + { + new WordCount().run(args); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/main/resources/META-INF/properties.xml b/examples/mroperator/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..5a56014 --- /dev/null +++ b/examples/mroperator/src/main/resources/META-INF/properties.xml @@ -0,0 +1,88 @@ +<?xml version="1.0"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Log Count Example --> + <property> + <name>dt.application.LogsCountExample.operator.Mapper.dirName</name> + <value></value> + </property> + <property> + <name>dt.application.LogsCountExample.operator.Mapper.partitionCount</name> + <value></value> + </property> + <property> + <name>dt.application.LogsCountExample.operator.Console.filePath</name> + <value></value> + </property> + <property> + <name>dt.application.LogsCountExample.operator.Console.outputFileName</name> + <value></value> + </property> + <property> + <name>dt.application.LogsCountExample.operator.Reducer.attr.PARTITIONER</name> + <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> + </property> + + <!-- Word Count Example --> + <property> + <name>dt.application.WordCountExample.operator.Mapper.dirName</name> + <value></value> + </property> + <property> + <name>dt.application.WordCountExample.operator.Mapper.partitionCount</name> + <value></value> + </property> + <property> + <name>dt.application.WordCountExample.operator.Console.filePath</name> + <value></value> + </property> + <property> + <name>dt.application.WordCountExample.operator.Console.outputFileName</name> + <value></value> + </property> + <property> + <name>dt.application.WordCountExample.operator.Reducer.attr.PARTITIONER</name> + <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> + </property> + + <!-- Inverted Index Example --> + <property> + <name>dt.application.InvertedIndexExample.operator.Mapper.dirName</name> + <value></value> + </property> + <property> + <name>dt.application.InvertedIndexExample.operator.Mapper.partitionCount</name> + <value></value> + </property> + <property> + <name>dt.application.InvertedIndexExample.operator.Console.filePath</name> + <value></value> + </property> + <property> + <name>dt.application.InvertedIndexExample.operator.Console.outputFileName</name> + <value></value> + </property> + <property> + <name>dt.application.LogsCountExample.operator.Reducer.attr.PARTITIONER</name> + <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> + </property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java new file mode 100644 index 0000000..073d847 --- /dev/null +++ b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/MapOperatorTest.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.serializer.SerializationFactory; +import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; + +import com.datatorrent.lib.testbench.CollectorTestSink; + +@SuppressWarnings("deprecation") +public class MapOperatorTest +{ + + private static Logger LOG = LoggerFactory.getLogger(MapOperatorTest.class); + + @Rule + public TestMeta testMeta = new TestMeta(); + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new MapOperator<LongWritable, Text, Text, IntWritable>()); + } + + public void testNodeProcessingSchema(MapOperator<LongWritable, Text, Text, IntWritable> oper) throws IOException + { + + CollectorTestSink sortSink = new CollectorTestSink(); + oper.output.setSink(sortSink); + + oper.setMapClass(WordCount.Map.class); + oper.setCombineClass(WordCount.Reduce.class); + oper.setDirName(testMeta.testDir); + oper.setConfigFile(null); + oper.setInputFormatClass(TextInputFormat.class); + + Configuration conf = new Configuration(); + JobConf jobConf = new JobConf(conf); + FileInputFormat.setInputPaths(jobConf, new Path(testMeta.testDir)); + TextInputFormat inputFormat = new TextInputFormat(); + inputFormat.configure(jobConf); + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + SerializationFactory serializationFactory = new SerializationFactory(conf); + Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass()); + keySerializer.open(oper.getOutstream()); + keySerializer.serialize(splits[0]); + oper.setInputSplitClass(splits[0].getClass()); + keySerializer.close(); + oper.setup(null); + oper.beginWindow(0); + oper.emitTuples(); + oper.emitTuples(); + oper.endWindow(); + oper.beginWindow(1); + oper.emitTuples(); + oper.endWindow(); + + Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size()); + for (Object o : sortSink.collectedTuples) { + LOG.debug(o.toString()); + } + LOG.debug("Done testing round\n"); + oper.teardown(); + } + + public static class TestMeta extends TestWatcher + { + public final String file1 = "file1"; + public String baseDir; + public String testDir; + + @Override + protected void starting(org.junit.runner.Description description) + { + String methodName = description.getMethodName(); + String className = description.getClassName(); + baseDir = "target/" + className; + testDir = baseDir + "/" + methodName; + try { + FileUtils.forceMkdir(new File(testDir)); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + createFile(testDir + "/" + file1, "1\n2\n3\n1\n2\n3\n"); + } + + private void createFile(String fileName, String data) + { + BufferedWriter output = null; + try { + output = new BufferedWriter(new FileWriter(new File(fileName))); + output.write(data); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + if (output != null) { + try { + output.close(); + } catch (IOException ex) { + LOG.error("not able to close the output stream: ", ex); + } + } + } + } + + @Override + protected void finished(Description description) + { + try { + FileUtils.deleteDirectory(new File(baseDir)); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java new file mode 100644 index 0000000..bff982a --- /dev/null +++ b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/ReduceOperatorTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; + +import com.datatorrent.lib.testbench.CollectorTestSink; +import com.datatorrent.lib.util.KeyHashValPair; + +public class ReduceOperatorTest +{ + private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class); + + /** + * Test node logic emits correct results + */ + @Test + public void testNodeProcessing() throws Exception + { + testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper) + { + oper.setReduceClass(WordCount.Reduce.class); + oper.setConfigFile(null); + oper.setup(null); + + CollectorTestSink sortSink = new CollectorTestSink(); + oper.output.setSink(sortSink); + + oper.beginWindow(0); + oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1)); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); + oper.endWindow(); + + oper.beginWindow(1); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); + oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); + oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1)); + oper.endWindow(); + Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size()); + for (Object o : sortSink.collectedTuples) { + logger.debug(o.toString()); + } + logger.debug("Done testing round\n"); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java new file mode 100644 index 0000000..fccfa3b --- /dev/null +++ b/examples/mroperator/src/test/java/org/apache/apex/examples/mroperator/WordCountMRApplicationTest.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.mroperator; + +import java.io.File; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.LocalMode; + +public class WordCountMRApplicationTest +{ + private static Logger LOG = LoggerFactory.getLogger(WordCountMRApplicationTest.class); + @Rule + public MapOperatorTest.TestMeta testMeta = new MapOperatorTest.TestMeta(); + + @Test + public void testSomeMethod() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.set("dt.application.WordCountExample.operator.Mapper.dirName", testMeta.testDir); + conf.setInt("dt.application.WordCountExample.operator.Mapper.partitionCount", 1); + conf.set("dt.application.WordCountExample.operator.Console.filePath", testMeta.testDir); + conf.set("dt.application.WordCountExample.operator.Console.outputFileName", "output.txt"); + lma.prepareDAG(new NewWordCountApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.setHeartbeatMonitoringEnabled(false); + lc.run(5000); + lc.shutdown(); + List<String> readLines = FileUtils.readLines(new File(testMeta.testDir + "/output.txt")); + Map<String,Integer> readMap = Maps.newHashMap(); + Iterator<String> itr = readLines.iterator(); + while (itr.hasNext()) { + String[] splits = itr.next().split("="); + readMap.put(splits[0],Integer.valueOf(splits[1])); + } + Map<String,Integer> expectedMap = Maps.newHashMap(); + expectedMap.put("1",2); + expectedMap.put("2",2); + expectedMap.put("3",2); + Assert.assertEquals("expected reduced data ", expectedMap, readMap); + LOG.info("read lines {}", readLines); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/mroperator/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/mroperator/src/test/resources/log4j.properties b/examples/mroperator/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/mroperator/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pi/pom.xml b/examples/pi/pom.xml new file mode 100644 index 0000000..6c4935f --- /dev/null +++ b/examples/pi/pom.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>malhar-examples-pi</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar Pi Example</name> + <description>Apex example applications that calculate the value of Pi. This is a starting point to understand how Apex works.</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <dependencies> + <dependency> + <groupId>it.unimi.dsi</groupId> + <artifactId>fastutil</artifactId> + <version>6.6.4</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/pi/src/assemble/appPackage.xml b/examples/pi/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/examples/pi/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java new file mode 100644 index 0000000..45f2b37 --- /dev/null +++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/Application.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.pi; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.testbench.RandomEventGenerator; + +/** + * Monte Carlo PI estimation Example : <br> + * This application computes value of PI using Monte Carlo pi estimation + * formula. + * <p> + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see something like the + * following output on the console (since the input sequence of random numbers + * can vary from one run to the next, there will be some variation in the + * output values): + * + * <pre> + * 3.1430480549199085 + * 3.1423454157782515 + * 3.1431377245508982 + * 3.142078799249531 + * 2013-06-18 10:43:18,335 [main] INFO stram.StramLocalCluster run - Application finished. + * </pre> + * + * Application DAG : <br> + * <img src="doc-files/Application.gif" width=600px > <br> + * <br> + * + * Streaming Window Size : 1000 ms(1 Sec) <br> + * Operator Details : <br> + * <ul> + * <li><b>The rand Operator : </b> This operator generates random integer + * between 0-30k. <br> + * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br> + * StateFull : No</li> + * <li><b>The calc operator : </b> This operator computes value of pi using + * monte carlo estimation. <br> + * Class : PiCalculateOperator <br> + * StateFull : No</li> + * <li><b>The operator Console: </b> This operator just outputs the input tuples + * to the console (or stdout). You can use other output adapters if needed.<br> + * </li> + * </ul> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = "PiExample") +public class Application implements StreamingApplication +{ + private final Locality locality = null; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); + PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator()); + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality); + dag.addStream("rand_console",calc.output, console.input).setLocality(locality); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java new file mode 100644 index 0000000..fbd196a --- /dev/null +++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationAppData.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.pi; + +import java.net.URI; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.Operator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; +import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; +import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; +import com.datatorrent.lib.testbench.RandomEventGenerator; + +/** + * Monte Carlo PI estimation example : <br> + * This application computes value of PI using Monte Carlo pi estimation + * formula. + * <p> + * Very similar to PiExample but data is also written to an App Data operator for visualization. + * <p> + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see something like the + * following output on the console (since the input sequence of random numbers + * can vary from one run to the next, there will be some variation in the + * output values): + * + * <pre> + * 3.1430480549199085 + * 3.1423454157782515 + * 3.1431377245508982 + * 3.142078799249531 + * 2013-06-18 10:43:18,335 [main] INFO stram.StramLocalCluster run - Application finished. + * </pre> + * + * Application DAG : <br> + * <img src="doc-files/Application.gif" width=600px > <br> + * <br> + * + * Streaming Window Size : 1000 ms(1 Sec) <br> + * Operator Details : <br> + * <ul> + * <li><b>The rand Operator : </b> This operator generates random integer + * between 0-30k. <br> + * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br> + * StateFull : No</li> + * <li><b>The calc operator : </b> This operator computes value of pi using + * monte carlo estimation. <br> + * Class : PiCalculateOperator <br> + * StateFull : No</li> + * <li><b>The operator Console: </b> This operator just outputs the input tuples + * to the console (or stdout). You can use other output adapters if needed.<br> + * </li> + * </ul> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = "PiExampleAppData") +public class ApplicationAppData implements StreamingApplication +{ + public static final String SNAPSHOT_SCHEMA = "PiExampleDataSchema.json"; + + private final Locality locality = null; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); + PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator()); + + + dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality); + + String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); + + if (StringUtils.isEmpty(gatewayAddress)) { + throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS"); + } + + URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); + + AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap()); + + String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA); + + snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON); + + PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery(); + wsQuery.enableEmbeddedMode(); + snapshotServer.setEmbeddableQueryInfoProvider(wsQuery); + + PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); + + wsQuery.setUri(uri); + wsResult.setUri(uri); + Operator.InputPort<String> queryResultPort = wsResult.input; + + NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>()); + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + + dag.addStream("PiValues", calc.output, adaptor.inPort, console.input).setLocality(locality);; + dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input); + dag.addStream("Result", snapshotServer.queryResult, queryResultPort); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java new file mode 100644 index 0000000..8a9cc50 --- /dev/null +++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/ApplicationWithScript.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.pi; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.script.JavaScriptOperator; +import com.datatorrent.lib.stream.RoundRobinHashMap; +import com.datatorrent.lib.testbench.RandomEventGenerator; + +/** + * Monte Carlo PI estimation example : <br> + * This application computes value of PI using Monte Carlo pi estimation + * formula. This example inputs formula using java script operator. + * + * Running Java Test or Main app in IDE: + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see following output on + * console: + * <pre> + * 2013-06-25 11:44:25,842 [container-2] DEBUG stram.StramChildAgent updateOperatorStatus - container-2 pendingDeploy [] + * 2013-06-25 11:44:25,929 [ServerHelper-1-1] DEBUG netlet.AbstractClient send - allocating new sendBuffer4Offers of size 16384 for Server.Subscriber{type=rrhm_calc/3.inBindings, mask=0, partitions=null} + * 3.16 + * 3.15 + * 3.1616 + * 3.148 + * 3.1393846153846154 + * </pre> + * + * * Application DAG : <br> + * <img src="doc-files/ApplicationScript.gif" width=600px > <br> + * <br> + * + * Streaming Window Size : 1000 ms(1 Sec) <br> + * Operator Details : <br> + * <ul> + * <li><b>The rand Operator : </b> This operator generates random integer + * between 0-30k. <br> + * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator} <br> + * StateFull : No</li> + * <li><b>The rrhm Operator : </b> This operator takes input from random generator + * creates tuples of (x,y) in round robin fashion. <br> + * Class : {@link com.datatorrent.lib.stream.RandomEventGenerator} <br> + * StateFull : Yes, tuple is emitted after (x, y) values have been aggregated.</li> + * <li><b>The calc operator : </b> This is java script operator implementing <br> + * Class : {@link com.datatorrent.lib.math.Script} <br> + * StateFull : No</li> + * <li><b>The operator Console: </b> This operator just outputs the input tuples + * to the console (or stdout). User can use any output adapter. <br> + * .</li> + * </ul> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = "PiJavaScriptExample") +public class ApplicationWithScript implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + int maxValue = 30000; + + RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); + rand.setMinvalue(0); + rand.setMaxvalue(maxValue); + + RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>()); + rrhm.setKeys(new String[]{"x", "y"}); + + JavaScriptOperator calc = dag.addOperator("picalc", new JavaScriptOperator()); + calc.setPassThru(false); + calc.put("i",0); + calc.put("count",0); + calc.addSetupScript("function pi() { if (x*x+y*y <= " + maxValue * maxValue + ") { i++; } count++; return i / count * 4; }"); + + calc.setInvoke("pi"); + + dag.addStream("rand_rrhm", rand.integer_data, rrhm.data); + dag.addStream("rrhm_calc", rrhm.map, calc.inBindings); + + ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); + dag.addStream("rand_console",calc.result, console.input); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java new file mode 100644 index 0000000..672a931 --- /dev/null +++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/Calculator.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.pi; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; +import com.datatorrent.lib.math.Division; +import com.datatorrent.lib.math.LogicalCompareToConstant; +import com.datatorrent.lib.math.MultiplyByConstant; +import com.datatorrent.lib.math.RunningAverage; +import com.datatorrent.lib.math.Sigma; +import com.datatorrent.lib.math.SquareCalculus; +import com.datatorrent.lib.stream.AbstractAggregator; +import com.datatorrent.lib.stream.ArrayListAggregator; +import com.datatorrent.lib.stream.Counter; +import com.datatorrent.lib.testbench.RandomEventGenerator; + +/** + * <p>Calculator class.</p> + * + * @since 0.3.2 + */ +@ApplicationAnnotation(name = "PiLibraryExample") +public class Calculator implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + /* keep generating random values between 0 and 30000 */ + RandomEventGenerator xyGenerator = dag.addOperator("GenerateX", RandomEventGenerator.class); + + /* calculate square of each of the values it receives */ + SquareCalculus squareOperator = dag.addOperator("SquareX", SquareCalculus.class); + + /* pair the consecutive values */ + AbstractAggregator<Integer> pairOperator = dag.addOperator("PairXY", new ArrayListAggregator<Integer>()); + Sigma<Integer> sumOperator = dag.addOperator("SumXY", new Sigma<Integer>()); + LogicalCompareToConstant<Integer> comparator = dag.addOperator("AnalyzeLocation", new LogicalCompareToConstant<Integer>()); + comparator.setConstant(30000 * 30000); + Counter inCircle = dag.addOperator("CountInCircle", Counter.class); + Counter inSquare = dag.addOperator("CountInSquare", Counter.class); + Division division = dag.addOperator("Ratio", Division.class); + MultiplyByConstant multiplication = dag.addOperator("InstantPI", MultiplyByConstant.class); + multiplication.setMultiplier(4); + RunningAverage average = dag.addOperator("AveragePI", new RunningAverage()); + ConsoleOutputOperator oper = dag.addOperator("Console", new ConsoleOutputOperator()); + + dag.addStream("x", xyGenerator.integer_data, squareOperator.input); + dag.addStream("sqr", squareOperator.integerResult, pairOperator.input); + dag.addStream("x2andy2", pairOperator.output, sumOperator.input); + dag.addStream("x2plusy2", sumOperator.integerResult, comparator.input, inSquare.input); + dag.addStream("inCirclePoints", comparator.greaterThan, inCircle.input); + dag.addStream("numerator", inCircle.output, division.numerator); + dag.addStream("denominator", inSquare.output, division.denominator); + dag.addStream("ratio", division.doubleQuotient, multiplication.input); + dag.addStream("instantPi", multiplication.doubleProduct, average.input); + dag.addStream("averagePi", average.doubleAverage, oper.input); + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java new file mode 100644 index 0000000..aef8a0c --- /dev/null +++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/NamedValueList.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.pi; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * <p>An operator which converts a raw value to a named value singleton list.</p> + * AppDataSnapshotServerMap.input accepts a List<Map<String,Object>> so we use this operator to + * convert individual values to a singleton list of a named value + * <p> + * @displayNamed Value + * @tags count + * @since 3.2.0 + */ +public class NamedValueList<T> extends BaseOperator +{ + @NotNull + private String valueName; + + private List<Map<String, T>> valueList; + private Map<String, T> valueMap; + + public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>() + { + @Override + public void process(T val) + { + valueMap.put(valueName, val); + outPort.emit(valueList); + } + }; + + public final transient DefaultOutputPort<List<Map<String, T>>> outPort = new DefaultOutputPort<>(); + + @Override + public void setup(OperatorContext context) + { + valueMap = new HashMap<>(); + valueMap.put(valueName, null); + valueList = Collections.singletonList(valueMap); + } + + @Override + public void teardown() + { + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + } + + public String getValueName() + { + return valueName; + } + + public void setValueName(String name) + { + valueName = name; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java new file mode 100644 index 0000000..b710a96 --- /dev/null +++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/PiCalculateOperator.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.pi; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * This operator implements Monte Carlo estimation of pi. For points randomly distributed points on + * square circle. pi ~= Number of points in circle/Total number of points * 4. + * + * @since 0.3.2 + */ +public class PiCalculateOperator extends BaseOperator +{ + private transient int x = -1; + private transient int y = -1; + private int base; + private long inArea = 0; + private long totalArea = 0; + public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() + { + @Override + public void process(Integer tuple) + { + if (x == -1) { + x = tuple; + } else { + y = tuple; + if (x * x + y * y <= base) { + inArea++; + } + totalArea++; + x = y = -1; + } + } + + }; + public final transient DefaultOutputPort<Double> output = new DefaultOutputPort<Double>(); + + @Override + public void setup(OperatorContext context) + { + logger.info("inArea {} totalArea {}", inArea, totalArea); + } + + @Override + public void beginWindow(long windowId) + { + } + + @Override + public void endWindow() + { + output.emit((double)inArea / totalArea * 4); + } + + public void setBase(int num) + { + base = num; + } + + public int getBase() + { + return base; + } + + private static Logger logger = LoggerFactory.getLogger(PiCalculateOperator.class); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif new file mode 100644 index 0000000..9545c6c Binary files /dev/null and b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/Application.gif differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif new file mode 100644 index 0000000..29f9ef8 Binary files /dev/null and b/examples/pi/src/main/java/org/apache/apex/examples/pi/doc-files/ApplicationScript.gif differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java b/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java new file mode 100644 index 0000000..3ebad48 --- /dev/null +++ b/examples/pi/src/main/java/org/apache/apex/examples/pi/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Pi calculation demonstration application. + */ +package org.apache.apex.examples.pi; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/resources/META-INF/properties.xml b/examples/pi/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..95df4c7 --- /dev/null +++ b/examples/pi/src/main/resources/META-INF/properties.xml @@ -0,0 +1,109 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Memory settings for all examples --> + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1024</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.MEMORY_MB</name> + <value>256</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name> + <value>-Xmx128M</value> + </property> + <property> + <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> + <value>128</value> + </property> + + <!-- PiExample --> + <property> + <name>dt.application.PiExample.operator.rand.minvalue</name> + <value>0</value> + </property> + <property> + <name>dt.application.PiExample.operator.rand.maxvalue</name> + <value>30000</value> + </property> + <property> + <name>dt.application.PiExample.operator.picalc.base</name> + <value>900000000</value> + </property> + <property> + <name>dt.application.PiExample.operator.adaptor.valueName</name> + <value>piValue</value> + </property> + + <!-- PiExampleAppData --> + <property> + <name>dt.application.PiExampleAppData.operator.rand.minvalue</name> + <value>0</value> + </property> + <property> + <name>dt.application.PiExampleAppData.operator.rand.maxvalue</name> + <value>30000</value> + </property> + <property> + <name>dt.application.PiExampleAppData.operator.picalc.base</name> + <value>900000000</value> + </property> + <property> + <name>dt.application.PiExampleAppData.operator.adaptor.valueName</name> + <value>piValue</value> + </property> + <property> + <name>dt.application.PiExampleAppData.operator.Query.topic</name> + <value>PiExampleQuery</value> + </property> + <property> + <name>dt.application.PiExampleAppData.operator.QueryResult.topic</name> + <value>PiExampleQueryResult</value> + </property> + <property> + <name>dt.application.PiExampleAppData.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name> + <value>PiExampleQuery</value> + </property> + + <!-- PiLibraryExample --> + <property> + <name>dt.application.PiLibraryExample.operator.GenerateX.minvalue</name> + <value>0</value> + </property> + <property> + <name>dt.application.PiLibraryExample.operator.GenerateX.maxvalue</name> + <value>30000</value> + </property> + <property> + <name>dt.application.PiLibraryExample.operator.PairXY.size</name> + <value>2</value> + </property> + + <!-- PiJavaScriptExample --> + <property> + <name>dt.application.PiJavaScriptExample.operator.rand.tuplesBlast</name> + <value>9</value> + </property> + +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/resources/PiExampleDataSchema.json ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/resources/PiExampleDataSchema.json b/examples/pi/src/main/resources/PiExampleDataSchema.json new file mode 100644 index 0000000..47db8eb --- /dev/null +++ b/examples/pi/src/main/resources/PiExampleDataSchema.json @@ -0,0 +1,3 @@ +{ + "values": [{"name": "piValue", "type": "double"}] +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/main/resources/app/PiJsonExample.json ---------------------------------------------------------------------- diff --git a/examples/pi/src/main/resources/app/PiJsonExample.json b/examples/pi/src/main/resources/app/PiJsonExample.json new file mode 100644 index 0000000..b1e2972 --- /dev/null +++ b/examples/pi/src/main/resources/app/PiJsonExample.json @@ -0,0 +1,52 @@ +{ + "description": "Pi JSON Example that is intended to demonstrate the capability of specifying an app using JSON", + "operators": [ + { + "name": "rand", + "class": "com.datatorrent.lib.testbench.RandomEventGenerator", + "properties": { + "minvalue": 0, + "maxvalue": 30000 + } + }, + { + "name": "picalc", + "class": "com.datatorrent.examples.pi.PiCalculateOperator", + "properties": { + "base": 900000000 + } + }, + { + "name": "console", + "class": "com.datatorrent.lib.io.ConsoleOutputOperator" + } + ], + "streams": [ + { + "name": "rand_calc", + "source": { + "operatorName": "rand", + "portName": "integer_data" + }, + "sinks": [ + { + "operatorName": "picalc", + "portName": "input" + } + ] + }, + { + "name": "calc_console", + "source": { + "operatorName": "picalc", + "portName": "output" + }, + "sinks": [ + { + "operatorName": "console", + "portName": "input" + } + ] + } + ] +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java new file mode 100644 index 0000000..d71288f --- /dev/null +++ b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.pi; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * + */ +public class ApplicationTest +{ + @Test + public void testSomeMethod() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource("dt-site-pi.xml"); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java new file mode 100644 index 0000000..9551c52 --- /dev/null +++ b/examples/pi/src/test/java/org/apache/apex/examples/pi/ApplicationWithScriptTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.pi; + +import org.junit.Test; + +import com.datatorrent.api.LocalMode; + +/** + * + */ +public class ApplicationWithScriptTest +{ + @Test + public void testSomeMethod() throws Exception + { + LocalMode.runApp(new ApplicationWithScript(), 10000); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java ---------------------------------------------------------------------- diff --git a/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java b/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java new file mode 100644 index 0000000..34ad387 --- /dev/null +++ b/examples/pi/src/test/java/org/apache/apex/examples/pi/CalculatorTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.examples.pi; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * + */ +public class CalculatorTest +{ + @Test + public void testSomeMethod() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource("dt-site-pilibrary.xml"); + lma.prepareDAG(new Calculator(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/resources/dt-site-pi.xml ---------------------------------------------------------------------- diff --git a/examples/pi/src/test/resources/dt-site-pi.xml b/examples/pi/src/test/resources/dt-site-pi.xml new file mode 100644 index 0000000..6032400 --- /dev/null +++ b/examples/pi/src/test/resources/dt-site-pi.xml @@ -0,0 +1,40 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>dt.application.PiExample.class</name> + <value>org.apache.apex.examples.pi.Application</value> + <description>An alias for the application</description> +</property> +<property> + <name>dt.application.PiExample.operator.rand.minvalue</name> + <value>0</value> +</property> +<property> + <name>dt.application.PiExample.operator.rand.maxvalue</name> + <value>30000</value> +</property> +<property> + <name>dt.application.PiExample.operator.picalc.base</name> + <value>900000000</value> +</property> +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/resources/dt-site-pilibrary.xml ---------------------------------------------------------------------- diff --git a/examples/pi/src/test/resources/dt-site-pilibrary.xml b/examples/pi/src/test/resources/dt-site-pilibrary.xml new file mode 100644 index 0000000..8f1ae8b --- /dev/null +++ b/examples/pi/src/test/resources/dt-site-pilibrary.xml @@ -0,0 +1,45 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <property> + <name>dt.application.PiLibraryExample.class</name> + <value>org.apache.apex.examples.pi.Calculator</value> + <description>An alias for the application</description> +</property> +<property> + <name>dt.application.PiLibraryExample.operator.GenerateX.minvalue</name> + <value>0</value> +</property> +<property> + <name>dt.application.PiLibraryExample.operator.GenerateX.maxvalue</name> + <value>30000</value> +</property> +<property> + <name>dt.application.PiLibraryExample.operator.PairXY.size</name> + <value>2</value> +</property> +<!-- +<property> + <name>dt.application.PiLibraryExample.operator.AnalyzeLocation.constant</name> + <value>900000000</value> +</property> --> + +</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/examples/pi/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/pi/src/test/resources/log4j.properties b/examples/pi/src/test/resources/log4j.properties new file mode 100644 index 0000000..cf0d19e --- /dev/null +++ b/examples/pi/src/test/resources/log4j.properties @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=DEBUG + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug +log4j.logger.org.apache.apex=debug
