http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java deleted file mode 100644 index 6c81724..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/OutputCollectorImpl.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.ArrayList; -import java.util.List; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.serializer.Deserializer; -import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.mapred.OutputCollector; -import com.datatorrent.lib.util.KeyHashValPair; - -/** - * <p>OutputCollectorImpl class.</p> - * - * @since 0.9.0 - */ -@SuppressWarnings("unchecked") -public class OutputCollectorImpl<K extends Object, V extends Object> implements OutputCollector<K, V> -{ - private static final Logger logger = LoggerFactory.getLogger(OutputCollectorImpl.class); - - private List<KeyHashValPair<K, V>> list = new ArrayList<KeyHashValPair<K, V>>(); - - public List<KeyHashValPair<K, V>> getList() - { - return list; - } - - private transient SerializationFactory serializationFactory; - private transient Configuration conf = null; - - public OutputCollectorImpl() - { - conf = new Configuration(); - serializationFactory = new SerializationFactory(conf); - - } - - private <T> T cloneObj(T t) throws IOException - { - Serializer<T> keySerializer; - Class<T> keyClass; - PipedInputStream pis = new PipedInputStream(); - PipedOutputStream pos = new PipedOutputStream(pis); - keyClass = (Class<T>)t.getClass(); - keySerializer = serializationFactory.getSerializer(keyClass); - keySerializer.open(pos); - keySerializer.serialize(t); - Deserializer<T> keyDesiralizer = serializationFactory.getDeserializer(keyClass); - keyDesiralizer.open(pis); - T clonedArg0 = keyDesiralizer.deserialize(null); - pos.close(); - pis.close(); - keySerializer.close(); - keyDesiralizer.close(); - return clonedArg0; - - } - - @Override - public void collect(K arg0, V arg1) throws IOException - { - if (conf == null) { - conf = new Configuration(); - serializationFactory = new SerializationFactory(conf); - } - list.add(new KeyHashValPair<K, V>(cloneObj(arg0), cloneObj(arg1))); - } -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java deleted file mode 100644 index 5df9b0d..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReduceOperator.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator; -import com.datatorrent.demos.mroperator.ReporterImpl.ReporterType; -import com.datatorrent.lib.util.KeyHashValPair; - -/** - * <p>ReduceOperator class.</p> - * - * @since 0.9.0 - */ -@SuppressWarnings({ "deprecation", "unused" }) -public class ReduceOperator<K1, V1, K2, V2> implements Operator -{ - private static final Logger logger = LoggerFactory.getLogger(ReduceOperator.class); - - private Class<? extends Reducer<K1, V1, K2, V2>> reduceClass; - private transient Reducer<K1, V1, K2, V2> reduceObj; - private transient Reporter reporter; - private OutputCollector<K2, V2> outputCollector; - private String configFile; - - public Class<? extends Reducer<K1, V1, K2, V2>> getReduceClass() - { - return reduceClass; - } - - public void setReduceClass(Class<? extends Reducer<K1, V1, K2, V2>> reduceClass) - { - this.reduceClass = reduceClass; - } - - public String getConfigFile() - { - return configFile; - } - - public void setConfigFile(String configFile) - { - this.configFile = configFile; - } - - private int numberOfMappersRunning = -1; - private int operatorId; - - public final transient DefaultInputPort<KeyHashValPair<Integer, Integer>> inputCount = new DefaultInputPort<KeyHashValPair<Integer, Integer>>() - { - @Override - public void process(KeyHashValPair<Integer, Integer> tuple) - { - logger.info("processing {}", tuple); - if (numberOfMappersRunning == -1) { - numberOfMappersRunning = tuple.getValue(); - } else { - numberOfMappersRunning += tuple.getValue(); - } - - } - - }; - - public final transient DefaultOutputPort<KeyHashValPair<K2, V2>> output = new DefaultOutputPort<KeyHashValPair<K2, V2>>(); - private Map<K1, List<V1>> cacheObject; - public final transient DefaultInputPort<KeyHashValPair<K1, V1>> input = new DefaultInputPort<KeyHashValPair<K1, V1>>() - { - @Override - public void process(KeyHashValPair<K1, V1> tuple) - { - // logger.info("processing tupple {}",tuple); - List<V1> list = cacheObject.get(tuple.getKey()); - if (list == null) { - list = new ArrayList<V1>(); - list.add(tuple.getValue()); - cacheObject.put(tuple.getKey(), list); - } else { - list.add(tuple.getValue()); - } - } - - }; - - @Override - public void setup(OperatorContext context) - { - reporter = new ReporterImpl(ReporterType.Reducer, new Counters()); - if (context != null) { - operatorId = context.getId(); - } - cacheObject = new HashMap<K1, List<V1>>(); - outputCollector = new OutputCollectorImpl<K2, V2>(); - if (reduceClass != null) { - try { - reduceObj = reduceClass.newInstance(); - } catch (Exception e) { - logger.info("can't instantiate object {}", e.getMessage()); - throw new RuntimeException(e); - } - Configuration conf = new Configuration(); - InputStream stream = null; - if (configFile != null && configFile.length() > 0) { - logger.info("system /{}", configFile); - stream = ClassLoader.getSystemResourceAsStream("/" + configFile); - if (stream == null) { - logger.info("system {}", configFile); - stream = ClassLoader.getSystemResourceAsStream(configFile); - } - } - if (stream != null) { - logger.info("found our stream... so adding it"); - conf.addResource(stream); - } - reduceObj.configure(new JobConf(conf)); - } - - } - - @Override - public void teardown() - { - - } - - @Override - public void beginWindow(long windowId) - { - - } - - @Override - public void endWindow() - { - if (numberOfMappersRunning == 0) { - for (Map.Entry<K1, List<V1>> e : cacheObject.entrySet()) { - try { - reduceObj.reduce(e.getKey(), e.getValue().iterator(), outputCollector, reporter); - } catch (IOException e1) { - logger.info(e1.getMessage()); - throw new RuntimeException(e1); - } - } - List<KeyHashValPair<K2, V2>> list = ((OutputCollectorImpl<K2, V2>)outputCollector).getList(); - for (KeyHashValPair<K2, V2> e : list) { - output.emit(e); - } - list.clear(); - cacheObject.clear(); - numberOfMappersRunning = -1; - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java deleted file mode 100644 index d2d38da..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/ReporterImpl.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.Counters.Counter; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.Reporter; - -/** - * <p>ReporterImpl class.</p> - * - * @since 0.9.0 - */ -public class ReporterImpl implements Reporter -{ - private Counters counters; - InputSplit inputSplit; - - public enum ReporterType - { - Mapper, Reducer - } - - private ReporterType typ; - - public ReporterImpl(final ReporterType kind, final Counters ctrs) - { - this.typ = kind; - this.counters = ctrs; - } - - @Override - public InputSplit getInputSplit() - { - if (typ == ReporterType.Reducer) { - throw new UnsupportedOperationException("Reducer cannot call getInputSplit()"); - } else { - return inputSplit; - } - } - - public void setInputSplit(InputSplit inputSplit) - { - this.inputSplit = inputSplit; - } - - @Override - public void incrCounter(Enum<?> key, long amount) - { - if (null != counters) { - counters.incrCounter(key, amount); - } - } - - @Override - public void incrCounter(String group, String counter, long amount) - { - if (null != counters) { - counters.incrCounter(group, counter, amount); - } - } - - @Override - public void setStatus(String status) - { - // do nothing. - } - - @Override - public void progress() - { - // do nothing. - } - - @Override - public Counter getCounter(String group, String name) - { - Counters.Counter counter = null; - if (counters != null) { - counter = counters.findCounter(group, name); - } - - return counter; - } - - @Override - public Counter getCounter(Enum<?> key) - { - Counters.Counter counter = null; - if (counters != null) { - counter = counters.findCounter(key); - } - - return counter; - } - - public float getProgress() - { - return 0; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java b/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java deleted file mode 100644 index f78cf99..0000000 --- a/demos/mroperator/src/main/java/com/datatorrent/demos/mroperator/WordCount.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.io.IOException; -import java.util.Iterator; -import java.util.StringTokenizer; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; - -/** - * <p>WordCount class.</p> - * - * @since 0.9.0 - */ -@SuppressWarnings("deprecation") -public class WordCount -{ - - public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> - { - private static final IntWritable one = new IntWritable(1); - private Text word = new Text(); - - public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException - { - String line = value.toString(); - StringTokenizer tokenizer = new StringTokenizer(line); - while (tokenizer.hasMoreTokens()) { - word.set(tokenizer.nextToken()); - output.collect(word, one); - } - } - } - - public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> - { - public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException - { - int sum = 0; - while (values.hasNext()) { - sum += values.next().get(); - } - output.collect(key, new IntWritable(sum)); - } - } - - public void run(String[] args) throws Exception - { - - JobConf conf = new JobConf(this.getClass()); - conf.setJobName("wordcount"); - - conf.setOutputKeyClass(Text.class); - conf.setOutputValueClass(IntWritable.class); - - conf.setMapperClass(Map.class); - conf.setCombinerClass(Reduce.class); - conf.setReducerClass(Reduce.class); - - conf.setInputFormat(TextInputFormat.class); - conf.setOutputFormat(TextOutputFormat.class); - - FileInputFormat.setInputPaths(conf, new Path(args[0])); - FileOutputFormat.setOutputPath(conf, new Path(args[1])); - - JobClient.runJob(conf); - } - - public static void main(String[] args) throws Exception - { - new WordCount().run(args); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/main/resources/META-INF/properties.xml b/demos/mroperator/src/main/resources/META-INF/properties.xml deleted file mode 100644 index ce95ec9..0000000 --- a/demos/mroperator/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,88 +0,0 @@ -<?xml version="1.0"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- Log Count Demo --> - <property> - <name>dt.application.LogsCountDemo.operator.Mapper.dirName</name> - <value></value> - </property> - <property> - <name>dt.application.LogsCountDemo.operator.Mapper.partitionCount</name> - <value></value> - </property> - <property> - <name>dt.application.LogsCountDemo.operator.Console.filePath</name> - <value></value> - </property> - <property> - <name>dt.application.LogsCountDemo.operator.Console.outputFileName</name> - <value></value> - </property> - <property> - <name>dt.application.LogsCountDemo.operator.Reducer.attr.PARTITIONER</name> - <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> - </property> - - <!-- Word Count Demo --> - <property> - <name>dt.application.WordCountDemo.operator.Mapper.dirName</name> - <value></value> - </property> - <property> - <name>dt.application.WordCountDemo.operator.Mapper.partitionCount</name> - <value></value> - </property> - <property> - <name>dt.application.WordCountDemo.operator.Console.filePath</name> - <value></value> - </property> - <property> - <name>dt.application.WordCountDemo.operator.Console.outputFileName</name> - <value></value> - </property> - <property> - <name>dt.application.WordCountDemo.operator.Reducer.attr.PARTITIONER</name> - <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> - </property> - - <!-- Inverted Index Demo --> - <property> - <name>dt.application.InvertedIndexDemo.operator.Mapper.dirName</name> - <value></value> - </property> - <property> - <name>dt.application.InvertedIndexDemo.operator.Mapper.partitionCount</name> - <value></value> - </property> - <property> - <name>dt.application.InvertedIndexDemo.operator.Console.filePath</name> - <value></value> - </property> - <property> - <name>dt.application.InvertedIndexDemo.operator.Console.outputFileName</name> - <value></value> - </property> - <property> - <name>dt.application.LogsCountDemo.operator.Reducer.attr.PARTITIONER</name> - <value>com.datatorrent.common.partitioner.StatelessPartitioner:1</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/site/conf/my-app-conf1.xml ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/site/conf/my-app-conf1.xml b/demos/mroperator/src/site/conf/my-app-conf1.xml deleted file mode 100644 index f35873b..0000000 --- a/demos/mroperator/src/site/conf/my-app-conf1.xml +++ /dev/null @@ -1,27 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java deleted file mode 100644 index 0f330e8..0000000 --- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/MapOperatorTest.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileWriter; -import java.io.IOException; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.io.serializer.Serializer; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.TextInputFormat; - -import com.datatorrent.lib.testbench.CollectorTestSink; - -@SuppressWarnings("deprecation") -public class MapOperatorTest -{ - - private static Logger LOG = LoggerFactory.getLogger(MapOperatorTest.class); - - @Rule - public TestMeta testMeta = new TestMeta(); - /** - * Test node logic emits correct results - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new MapOperator<LongWritable, Text, Text, IntWritable>()); - } - - public void testNodeProcessingSchema(MapOperator<LongWritable, Text, Text, IntWritable> oper) throws IOException - { - - CollectorTestSink sortSink = new CollectorTestSink(); - oper.output.setSink(sortSink); - - oper.setMapClass(WordCount.Map.class); - oper.setCombineClass(WordCount.Reduce.class); - oper.setDirName(testMeta.testDir); - oper.setConfigFile(null); - oper.setInputFormatClass(TextInputFormat.class); - - Configuration conf = new Configuration(); - JobConf jobConf = new JobConf(conf); - FileInputFormat.setInputPaths(jobConf, new Path(testMeta.testDir)); - TextInputFormat inputFormat = new TextInputFormat(); - inputFormat.configure(jobConf); - InputSplit[] splits = inputFormat.getSplits(jobConf, 1); - SerializationFactory serializationFactory = new SerializationFactory(conf); - Serializer keySerializer = serializationFactory.getSerializer(splits[0].getClass()); - keySerializer.open(oper.getOutstream()); - keySerializer.serialize(splits[0]); - oper.setInputSplitClass(splits[0].getClass()); - keySerializer.close(); - oper.setup(null); - oper.beginWindow(0); - oper.emitTuples(); - oper.emitTuples(); - oper.endWindow(); - oper.beginWindow(1); - oper.emitTuples(); - oper.endWindow(); - - Assert.assertEquals("number emitted tuples", 3, sortSink.collectedTuples.size()); - for (Object o : sortSink.collectedTuples) { - LOG.debug(o.toString()); - } - LOG.debug("Done testing round\n"); - oper.teardown(); - } - - public static class TestMeta extends TestWatcher - { - public final String file1 = "file1"; - public String baseDir; - public String testDir; - - @Override - protected void starting(org.junit.runner.Description description) - { - String methodName = description.getMethodName(); - String className = description.getClassName(); - baseDir = "target/" + className; - testDir = baseDir + "/" + methodName; - try { - FileUtils.forceMkdir(new File(testDir)); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - createFile(testDir + "/" + file1, "1\n2\n3\n1\n2\n3\n"); - } - - private void createFile(String fileName, String data) - { - BufferedWriter output = null; - try { - output = new BufferedWriter(new FileWriter(new File(fileName))); - output.write(data); - } catch (IOException ex) { - throw new RuntimeException(ex); - } finally { - if (output != null) { - try { - output.close(); - } catch (IOException ex) { - LOG.error("not able to close the output stream: ", ex); - } - } - } - } - - @Override - protected void finished(Description description) - { - try { - FileUtils.deleteDirectory(new File(baseDir)); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java deleted file mode 100644 index b85f8ad..0000000 --- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/ReduceOperatorTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.Text; - -import com.datatorrent.lib.testbench.CollectorTestSink; -import com.datatorrent.lib.util.KeyHashValPair; - -public class ReduceOperatorTest -{ - private static Logger logger = LoggerFactory.getLogger(ReduceOperatorTest.class); - - /** - * Test node logic emits correct results - */ - @Test - public void testNodeProcessing() throws Exception - { - testNodeProcessingSchema(new ReduceOperator<Text, IntWritable,Text, IntWritable>()); - } - - @SuppressWarnings({ "rawtypes", "unchecked" }) - public void testNodeProcessingSchema(ReduceOperator<Text, IntWritable,Text, IntWritable> oper) - { - oper.setReduceClass(WordCount.Reduce.class); - oper.setConfigFile(null); - oper.setup(null); - - CollectorTestSink sortSink = new CollectorTestSink(); - oper.output.setSink(sortSink); - - oper.beginWindow(0); - oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, 1)); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); - oper.endWindow(); - - oper.beginWindow(1); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("one"), new IntWritable(1))); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); - oper.input.process(new KeyHashValPair<Text, IntWritable>(new Text("two"), new IntWritable(1))); - oper.inputCount.process(new KeyHashValPair<Integer, Integer>(1, -1)); - oper.endWindow(); - Assert.assertEquals("number emitted tuples", 2, sortSink.collectedTuples.size()); - for (Object o : sortSink.collectedTuples) { - logger.debug(o.toString()); - } - logger.debug("Done testing round\n"); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java b/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java deleted file mode 100644 index bd732c1..0000000 --- a/demos/mroperator/src/test/java/com/datatorrent/demos/mroperator/WordCountMRApplicationTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.mroperator; - -import java.io.File; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.junit.Assert; -import org.junit.Rule; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.Maps; - -import com.datatorrent.api.LocalMode; - -public class WordCountMRApplicationTest -{ - private static Logger LOG = LoggerFactory.getLogger(WordCountMRApplicationTest.class); - @Rule - public MapOperatorTest.TestMeta testMeta = new MapOperatorTest.TestMeta(); - - @Test - public void testSomeMethod() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.set("dt.application.WordCountDemo.operator.Mapper.dirName", testMeta.testDir); - conf.setInt("dt.application.WordCountDemo.operator.Mapper.partitionCount", 1); - conf.set("dt.application.WordCountDemo.operator.Console.filePath", testMeta.testDir); - conf.set("dt.application.WordCountDemo.operator.Console.outputFileName", "output.txt"); - lma.prepareDAG(new NewWordCountApplication(), conf); - LocalMode.Controller lc = lma.getController(); - lc.setHeartbeatMonitoringEnabled(false); - lc.run(5000); - lc.shutdown(); - List<String> readLines = FileUtils.readLines(new File(testMeta.testDir + "/output.txt")); - Map<String,Integer> readMap = Maps.newHashMap(); - Iterator<String> itr = readLines.iterator(); - while (itr.hasNext()) { - String[] splits = itr.next().split("="); - readMap.put(splits[0],Integer.valueOf(splits[1])); - } - Map<String,Integer> expectedMap = Maps.newHashMap(); - expectedMap.put("1",2); - expectedMap.put("2",2); - expectedMap.put("3",2); - Assert.assertEquals("expected reduced data ", expectedMap, readMap); - LOG.info("read lines {}", readLines); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/mroperator/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/mroperator/src/test/resources/log4j.properties b/demos/mroperator/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/mroperator/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/pom.xml ---------------------------------------------------------------------- diff --git a/demos/pi/pom.xml b/demos/pi/pom.xml deleted file mode 100644 index 30c2dc1..0000000 --- a/demos/pi/pom.xml +++ /dev/null @@ -1,45 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>pi-demo</artifactId> - <packaging>jar</packaging> - - <name>Apache Apex Malhar Pi Demo</name> - <description>Apex demo applications that calculate the value of Pi. This is a starting point to understand how Apex works.</description> - - <parent> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-demos</artifactId> - <version>3.7.0-SNAPSHOT</version> - </parent> - - <dependencies> - <dependency> - <groupId>it.unimi.dsi</groupId> - <artifactId>fastutil</artifactId> - <version>6.6.4</version> - </dependency> - </dependencies> - -</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/pi/src/assemble/appPackage.xml b/demos/pi/src/assemble/appPackage.xml deleted file mode 100644 index 4138cf2..0000000 --- a/demos/pi/src/assemble/appPackage.xml +++ /dev/null @@ -1,59 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> - <id>appPackage</id> - <formats> - <format>jar</format> - </formats> - <includeBaseDirectory>false</includeBaseDirectory> - <fileSets> - <fileSet> - <directory>${basedir}/target/</directory> - <outputDirectory>/app</outputDirectory> - <includes> - <include>${project.artifactId}-${project.version}.jar</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/target/deps</directory> - <outputDirectory>/lib</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/site/conf</directory> - <outputDirectory>/conf</outputDirectory> - <includes> - <include>*.xml</include> - </includes> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/META-INF</directory> - <outputDirectory>/META-INF</outputDirectory> - </fileSet> - <fileSet> - <directory>${basedir}/src/main/resources/app</directory> - <outputDirectory>/app</outputDirectory> - </fileSet> - </fileSets> - -</assembly> - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java deleted file mode 100644 index 55ffe92..0000000 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Application.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.pi; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.testbench.RandomEventGenerator; - -/** - * Monte Carlo PI estimation demo : <br> - * This application computes value of PI using Monte Carlo pi estimation - * formula. - * <p> - * Running Java Test or Main app in IDE: - * - * <pre> - * LocalMode.runApp(new Application(), 600000); // 10 min run - * </pre> - * - * Run Success : <br> - * For successful deployment and run, user should see something like the - * following output on the console (since the input sequence of random numbers - * can vary from one run to the next, there will be some variation in the - * output values): - * - * <pre> - * 3.1430480549199085 - * 3.1423454157782515 - * 3.1431377245508982 - * 3.142078799249531 - * 2013-06-18 10:43:18,335 [main] INFO stram.StramLocalCluster run - Application finished. - * </pre> - * - * Application DAG : <br> - * <img src="doc-files/Application.gif" width=600px > <br> - * <br> - * - * Streaming Window Size : 1000 ms(1 Sec) <br> - * Operator Details : <br> - * <ul> - * <li><b>The rand Operator : </b> This operator generates random integer - * between 0-30k. <br> - * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br> - * StateFull : No</li> - * <li><b>The calc operator : </b> This operator computes value of pi using - * monte carlo estimation. <br> - * Class : com.datatorrent.demos.pi.PiCalculateOperator <br> - * StateFull : No</li> - * <li><b>The operator Console: </b> This operator just outputs the input tuples - * to the console (or stdout). You can use other output adapters if needed.<br> - * </li> - * </ul> - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = "PiDemo") -public class Application implements StreamingApplication -{ - private final Locality locality = null; - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); - PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator()); - ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); - dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality); - dag.addStream("rand_console",calc.output, console.input).setLocality(locality); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java deleted file mode 100644 index 328bb10..0000000 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationAppData.java +++ /dev/null @@ -1,135 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.pi; - -import java.net.URI; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.api.Operator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.appdata.schemas.SchemaUtils; -import com.datatorrent.lib.appdata.snapshot.AppDataSnapshotServerMap; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.io.PubSubWebSocketAppDataQuery; -import com.datatorrent.lib.io.PubSubWebSocketAppDataResult; -import com.datatorrent.lib.testbench.RandomEventGenerator; - -/** - * Monte Carlo PI estimation demo : <br> - * This application computes value of PI using Monte Carlo pi estimation - * formula. - * <p> - * Very similar to PiDemo but data is also written to an App Data operator for visualization. - * <p> - * Running Java Test or Main app in IDE: - * - * <pre> - * LocalMode.runApp(new Application(), 600000); // 10 min run - * </pre> - * - * Run Success : <br> - * For successful deployment and run, user should see something like the - * following output on the console (since the input sequence of random numbers - * can vary from one run to the next, there will be some variation in the - * output values): - * - * <pre> - * 3.1430480549199085 - * 3.1423454157782515 - * 3.1431377245508982 - * 3.142078799249531 - * 2013-06-18 10:43:18,335 [main] INFO stram.StramLocalCluster run - Application finished. - * </pre> - * - * Application DAG : <br> - * <img src="doc-files/Application.gif" width=600px > <br> - * <br> - * - * Streaming Window Size : 1000 ms(1 Sec) <br> - * Operator Details : <br> - * <ul> - * <li><b>The rand Operator : </b> This operator generates random integer - * between 0-30k. <br> - * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator}<br> - * StateFull : No</li> - * <li><b>The calc operator : </b> This operator computes value of pi using - * monte carlo estimation. <br> - * Class : com.datatorrent.demos.pi.PiCalculateOperator <br> - * StateFull : No</li> - * <li><b>The operator Console: </b> This operator just outputs the input tuples - * to the console (or stdout). You can use other output adapters if needed.<br> - * </li> - * </ul> - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = "PiDemoAppData") -public class ApplicationAppData implements StreamingApplication -{ - public static final String SNAPSHOT_SCHEMA = "PiDemoDataSchema.json"; - - private final Locality locality = null; - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); - PiCalculateOperator calc = dag.addOperator("picalc", new PiCalculateOperator()); - - - dag.addStream("rand_calc", rand.integer_data, calc.input).setLocality(locality); - - String gatewayAddress = dag.getValue(DAG.GATEWAY_CONNECT_ADDRESS); - - if (StringUtils.isEmpty(gatewayAddress)) { - throw new RuntimeException("Error: No GATEWAY_CONNECT_ADDRESS"); - } - - URI uri = URI.create("ws://" + gatewayAddress + "/pubsub"); - - AppDataSnapshotServerMap snapshotServer = dag.addOperator("SnapshotServer", new AppDataSnapshotServerMap()); - - String snapshotServerJSON = SchemaUtils.jarResourceFileToString(SNAPSHOT_SCHEMA); - - snapshotServer.setSnapshotSchemaJSON(snapshotServerJSON); - - PubSubWebSocketAppDataQuery wsQuery = new PubSubWebSocketAppDataQuery(); - wsQuery.enableEmbeddedMode(); - snapshotServer.setEmbeddableQueryInfoProvider(wsQuery); - - PubSubWebSocketAppDataResult wsResult = dag.addOperator("QueryResult", new PubSubWebSocketAppDataResult()); - - wsQuery.setUri(uri); - wsResult.setUri(uri); - Operator.InputPort<String> queryResultPort = wsResult.input; - - NamedValueList<Object> adaptor = dag.addOperator("adaptor", new NamedValueList<Object>()); - ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); - - dag.addStream("PiValues", calc.output, adaptor.inPort, console.input).setLocality(locality);; - dag.addStream("NamedPiValues", adaptor.outPort, snapshotServer.input); - dag.addStream("Result", snapshotServer.queryResult, queryResultPort); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java deleted file mode 100644 index 0796608..0000000 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/ApplicationWithScript.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.pi; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.script.JavaScriptOperator; -import com.datatorrent.lib.stream.RoundRobinHashMap; -import com.datatorrent.lib.testbench.RandomEventGenerator; - -/** - * Monte Carlo PI estimation demo : <br> - * This application computes value of PI using Monte Carlo pi estimation - * formula. This demo inputs formula using java script operator. - * - * Running Java Test or Main app in IDE: - * - * <pre> - * LocalMode.runApp(new Application(), 600000); // 10 min run - * </pre> - * - * Run Success : <br> - * For successful deployment and run, user should see following output on - * console: - * <pre> - * 2013-06-25 11:44:25,842 [container-2] DEBUG stram.StramChildAgent updateOperatorStatus - container-2 pendingDeploy [] - * 2013-06-25 11:44:25,929 [ServerHelper-1-1] DEBUG netlet.AbstractClient send - allocating new sendBuffer4Offers of size 16384 for Server.Subscriber{type=rrhm_calc/3.inBindings, mask=0, partitions=null} - * 3.16 - * 3.15 - * 3.1616 - * 3.148 - * 3.1393846153846154 - * </pre> - * - * * Application DAG : <br> - * <img src="doc-files/ApplicationScript.gif" width=600px > <br> - * <br> - * - * Streaming Window Size : 1000 ms(1 Sec) <br> - * Operator Details : <br> - * <ul> - * <li><b>The rand Operator : </b> This operator generates random integer - * between 0-30k. <br> - * Class : {@link com.datatorrent.lib.testbench.RandomEventGenerator} <br> - * StateFull : No</li> - * <li><b>The rrhm Operator : </b> This operator takes input from random generator - * creates tuples of (x,y) in round robin fashion. <br> - * Class : {@link com.datatorrent.lib.stream.RandomEventGenerator} <br> - * StateFull : Yes, tuple is emitted after (x, y) values have been aggregated.</li> - * <li><b>The calc operator : </b> This is java script operator implementing <br> - * Class : {@link com.datatorrent.lib.math.Script} <br> - * StateFull : No</li> - * <li><b>The operator Console: </b> This operator just outputs the input tuples - * to the console (or stdout). User can use any output adapter. <br> - * .</li> - * </ul> - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = "PiJavaScriptDemo") -public class ApplicationWithScript implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - int maxValue = 30000; - - RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); - rand.setMinvalue(0); - rand.setMaxvalue(maxValue); - - RoundRobinHashMap<String,Object> rrhm = dag.addOperator("rrhm", new RoundRobinHashMap<String, Object>()); - rrhm.setKeys(new String[]{"x", "y"}); - - JavaScriptOperator calc = dag.addOperator("picalc", new JavaScriptOperator()); - calc.setPassThru(false); - calc.put("i",0); - calc.put("count",0); - calc.addSetupScript("function pi() { if (x*x+y*y <= " + maxValue * maxValue + ") { i++; } count++; return i / count * 4; }"); - - calc.setInvoke("pi"); - - dag.addStream("rand_rrhm", rand.integer_data, rrhm.data); - dag.addStream("rrhm_calc", rrhm.map, calc.inBindings); - - ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator()); - dag.addStream("rand_console",calc.result, console.input); - - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java deleted file mode 100644 index 221ecc0..0000000 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/Calculator.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.pi; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.math.Division; -import com.datatorrent.lib.math.LogicalCompareToConstant; -import com.datatorrent.lib.math.MultiplyByConstant; -import com.datatorrent.lib.math.RunningAverage; -import com.datatorrent.lib.math.Sigma; -import com.datatorrent.lib.math.SquareCalculus; -import com.datatorrent.lib.stream.AbstractAggregator; -import com.datatorrent.lib.stream.ArrayListAggregator; -import com.datatorrent.lib.stream.Counter; -import com.datatorrent.lib.testbench.RandomEventGenerator; - -/** - * <p>Calculator class.</p> - * - * @since 0.3.2 - */ -@ApplicationAnnotation(name = "PiLibraryDemo") -public class Calculator implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - /* keep generating random values between 0 and 30000 */ - RandomEventGenerator xyGenerator = dag.addOperator("GenerateX", RandomEventGenerator.class); - - /* calculate square of each of the values it receives */ - SquareCalculus squareOperator = dag.addOperator("SquareX", SquareCalculus.class); - - /* pair the consecutive values */ - AbstractAggregator<Integer> pairOperator = dag.addOperator("PairXY", new ArrayListAggregator<Integer>()); - Sigma<Integer> sumOperator = dag.addOperator("SumXY", new Sigma<Integer>()); - LogicalCompareToConstant<Integer> comparator = dag.addOperator("AnalyzeLocation", new LogicalCompareToConstant<Integer>()); - comparator.setConstant(30000 * 30000); - Counter inCircle = dag.addOperator("CountInCircle", Counter.class); - Counter inSquare = dag.addOperator("CountInSquare", Counter.class); - Division division = dag.addOperator("Ratio", Division.class); - MultiplyByConstant multiplication = dag.addOperator("InstantPI", MultiplyByConstant.class); - multiplication.setMultiplier(4); - RunningAverage average = dag.addOperator("AveragePI", new RunningAverage()); - ConsoleOutputOperator oper = dag.addOperator("Console", new ConsoleOutputOperator()); - - dag.addStream("x", xyGenerator.integer_data, squareOperator.input); - dag.addStream("sqr", squareOperator.integerResult, pairOperator.input); - dag.addStream("x2andy2", pairOperator.output, sumOperator.input); - dag.addStream("x2plusy2", sumOperator.integerResult, comparator.input, inSquare.input); - dag.addStream("inCirclePoints", comparator.greaterThan, inCircle.input); - dag.addStream("numerator", inCircle.output, division.numerator); - dag.addStream("denominator", inSquare.output, division.denominator); - dag.addStream("ratio", division.doubleQuotient, multiplication.input); - dag.addStream("instantPi", multiplication.doubleProduct, average.input); - dag.addStream("averagePi", average.doubleAverage, oper.input); - - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java deleted file mode 100644 index c50e17e..0000000 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/NamedValueList.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.pi; - -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import javax.validation.constraints.NotNull; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -/** - * <p>An operator which converts a raw value to a named value singleton list.</p> - * AppDataSnapshotServerMap.input accepts a List<Map<String,Object>> so we use this operator to - * convert individual values to a singleton list of a named value - * <p> - * @displayNamed Value - * @tags count - * @since 3.2.0 - */ -public class NamedValueList<T> extends BaseOperator -{ - @NotNull - private String valueName; - - private List<Map<String, T>> valueList; - private Map<String, T> valueMap; - - public final transient DefaultInputPort<T> inPort = new DefaultInputPort<T>() - { - @Override - public void process(T val) - { - valueMap.put(valueName, val); - outPort.emit(valueList); - } - }; - - public final transient DefaultOutputPort<List<Map<String, T>>> outPort = new DefaultOutputPort<>(); - - @Override - public void setup(OperatorContext context) - { - valueMap = new HashMap<>(); - valueMap.put(valueName, null); - valueList = Collections.singletonList(valueMap); - } - - @Override - public void teardown() - { - } - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - } - - public String getValueName() - { - return valueName; - } - - public void setValueName(String name) - { - valueName = name; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java deleted file mode 100644 index 8e61991..0000000 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/PiCalculateOperator.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.pi; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -/** - * This operator implements Monte Carlo estimation of pi. For points randomly distributed points on - * square circle. pi ~= Number of points in circle/Total number of points * 4. - * - * @since 0.3.2 - */ -public class PiCalculateOperator extends BaseOperator -{ - private transient int x = -1; - private transient int y = -1; - private int base; - private long inArea = 0; - private long totalArea = 0; - public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>() - { - @Override - public void process(Integer tuple) - { - if (x == -1) { - x = tuple; - } else { - y = tuple; - if (x * x + y * y <= base) { - inArea++; - } - totalArea++; - x = y = -1; - } - } - - }; - public final transient DefaultOutputPort<Double> output = new DefaultOutputPort<Double>(); - - @Override - public void setup(OperatorContext context) - { - logger.info("inArea {} totalArea {}", inArea, totalArea); - } - - @Override - public void beginWindow(long windowId) - { - } - - @Override - public void endWindow() - { - output.emit((double)inArea / totalArea * 4); - } - - public void setBase(int num) - { - base = num; - } - - public int getBase() - { - return base; - } - - private static Logger logger = LoggerFactory.getLogger(PiCalculateOperator.class); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/java/com/datatorrent/demos/pi/doc-files/Application.gif ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/doc-files/Application.gif b/demos/pi/src/main/java/com/datatorrent/demos/pi/doc-files/Application.gif deleted file mode 100644 index 9545c6c..0000000 Binary files a/demos/pi/src/main/java/com/datatorrent/demos/pi/doc-files/Application.gif and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/java/com/datatorrent/demos/pi/doc-files/ApplicationScript.gif ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/doc-files/ApplicationScript.gif b/demos/pi/src/main/java/com/datatorrent/demos/pi/doc-files/ApplicationScript.gif deleted file mode 100644 index 29f9ef8..0000000 Binary files a/demos/pi/src/main/java/com/datatorrent/demos/pi/doc-files/ApplicationScript.gif and /dev/null differ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/java/com/datatorrent/demos/pi/package-info.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/java/com/datatorrent/demos/pi/package-info.java b/demos/pi/src/main/java/com/datatorrent/demos/pi/package-info.java deleted file mode 100644 index 11614d6..0000000 --- a/demos/pi/src/main/java/com/datatorrent/demos/pi/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -/** - * Pi calculation demonstration application. - */ -package com.datatorrent.demos.pi; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/resources/META-INF/properties.xml b/demos/pi/src/main/resources/META-INF/properties.xml deleted file mode 100644 index 96d097d..0000000 --- a/demos/pi/src/main/resources/META-INF/properties.xml +++ /dev/null @@ -1,109 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <!-- Memory settings for all demos --> - <property> - <name>dt.attr.MASTER_MEMORY_MB</name> - <value>1024</value> - </property> - <property> - <name>dt.application.*.operator.*.attr.MEMORY_MB</name> - <value>256</value> - </property> - <property> - <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name> - <value>-Xmx128M</value> - </property> - <property> - <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> - <value>128</value> - </property> - - <!-- PiDemo --> - <property> - <name>dt.application.PiDemo.operator.rand.minvalue</name> - <value>0</value> - </property> - <property> - <name>dt.application.PiDemo.operator.rand.maxvalue</name> - <value>30000</value> - </property> - <property> - <name>dt.application.PiDemo.operator.picalc.base</name> - <value>900000000</value> - </property> - <property> - <name>dt.application.PiDemo.operator.adaptor.valueName</name> - <value>piValue</value> - </property> - - <!-- PiDemoAppData --> - <property> - <name>dt.application.PiDemoAppData.operator.rand.minvalue</name> - <value>0</value> - </property> - <property> - <name>dt.application.PiDemoAppData.operator.rand.maxvalue</name> - <value>30000</value> - </property> - <property> - <name>dt.application.PiDemoAppData.operator.picalc.base</name> - <value>900000000</value> - </property> - <property> - <name>dt.application.PiDemoAppData.operator.adaptor.valueName</name> - <value>piValue</value> - </property> - <property> - <name>dt.application.PiDemoAppData.operator.Query.topic</name> - <value>PiDemoQuery</value> - </property> - <property> - <name>dt.application.PiDemoAppData.operator.QueryResult.topic</name> - <value>PiDemoQueryResult</value> - </property> - <property> - <name>dt.application.PiDemoAppData.operator.SnapshotServer.embeddableQueryInfoProvider.topic</name> - <value>PiDemoQuery</value> - </property> - - <!-- PiLibraryDemo --> - <property> - <name>dt.application.PiLibraryDemo.operator.GenerateX.minvalue</name> - <value>0</value> - </property> - <property> - <name>dt.application.PiLibraryDemo.operator.GenerateX.maxvalue</name> - <value>30000</value> - </property> - <property> - <name>dt.application.PiLibraryDemo.operator.PairXY.size</name> - <value>2</value> - </property> - - <!-- PiJavaScriptDemo --> - <property> - <name>dt.application.PiJavaScriptDemo.operator.rand.tuplesBlast</name> - <value>9</value> - </property> - -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/resources/PiDemoDataSchema.json ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/resources/PiDemoDataSchema.json b/demos/pi/src/main/resources/PiDemoDataSchema.json deleted file mode 100644 index 47db8eb..0000000 --- a/demos/pi/src/main/resources/PiDemoDataSchema.json +++ /dev/null @@ -1,3 +0,0 @@ -{ - "values": [{"name": "piValue", "type": "double"}] -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/main/resources/app/PiJsonDemo.json ---------------------------------------------------------------------- diff --git a/demos/pi/src/main/resources/app/PiJsonDemo.json b/demos/pi/src/main/resources/app/PiJsonDemo.json deleted file mode 100644 index c077213..0000000 --- a/demos/pi/src/main/resources/app/PiJsonDemo.json +++ /dev/null @@ -1,52 +0,0 @@ -{ - "description": "Pi JSON Demo that is intended to demonstrate the capability of specifying an app using JSON", - "operators": [ - { - "name": "rand", - "class": "com.datatorrent.lib.testbench.RandomEventGenerator", - "properties": { - "minvalue": 0, - "maxvalue": 30000 - } - }, - { - "name": "picalc", - "class": "com.datatorrent.demos.pi.PiCalculateOperator", - "properties": { - "base": 900000000 - } - }, - { - "name": "console", - "class": "com.datatorrent.lib.io.ConsoleOutputOperator" - } - ], - "streams": [ - { - "name": "rand_calc", - "source": { - "operatorName": "rand", - "portName": "integer_data" - }, - "sinks": [ - { - "operatorName": "picalc", - "portName": "input" - } - ] - }, - { - "name": "calc_console", - "source": { - "operatorName": "picalc", - "portName": "output" - }, - "sinks": [ - { - "operatorName": "console", - "portName": "input" - } - ] - } - ] -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java deleted file mode 100644 index d8881c2..0000000 --- a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.pi; - -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -/** - * - */ -public class ApplicationTest -{ - @Test - public void testSomeMethod() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource("dt-site-pi.xml"); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationWithScriptTest.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationWithScriptTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationWithScriptTest.java deleted file mode 100644 index 8f35e5a..0000000 --- a/demos/pi/src/test/java/com/datatorrent/demos/pi/ApplicationWithScriptTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.pi; - -import org.junit.Test; - -import com.datatorrent.api.LocalMode; - -/** - * - */ -public class ApplicationWithScriptTest -{ - @Test - public void testSomeMethod() throws Exception - { - LocalMode.runApp(new ApplicationWithScript(), 10000); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java deleted file mode 100644 index 21079d7..0000000 --- a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.demos.pi; - -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -/** - * - */ -public class CalculatorTest -{ - @Test - public void testSomeMethod() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource("dt-site-pilibrary.xml"); - lma.prepareDAG(new Calculator(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/test/resources/dt-site-pi.xml ---------------------------------------------------------------------- diff --git a/demos/pi/src/test/resources/dt-site-pi.xml b/demos/pi/src/test/resources/dt-site-pi.xml deleted file mode 100644 index 6230cdd..0000000 --- a/demos/pi/src/test/resources/dt-site-pi.xml +++ /dev/null @@ -1,40 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" standalone="no"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.application.PiDemo.class</name> - <value>com.datatorrent.demos.pi.Application</value> - <description>An alias for the application</description> -</property> -<property> - <name>dt.application.PiDemo.operator.rand.minvalue</name> - <value>0</value> -</property> -<property> - <name>dt.application.PiDemo.operator.rand.maxvalue</name> - <value>30000</value> -</property> -<property> - <name>dt.application.PiDemo.operator.picalc.base</name> - <value>900000000</value> -</property> -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/test/resources/dt-site-pilibrary.xml ---------------------------------------------------------------------- diff --git a/demos/pi/src/test/resources/dt-site-pilibrary.xml b/demos/pi/src/test/resources/dt-site-pilibrary.xml deleted file mode 100644 index 01c7ef3..0000000 --- a/demos/pi/src/test/resources/dt-site-pilibrary.xml +++ /dev/null @@ -1,45 +0,0 @@ -<!-- - - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, - software distributed under the License is distributed on an - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - KIND, either express or implied. See the License for the - specific language governing permissions and limitations - under the License. - ---> -<configuration> - <property> - <name>dt.application.PiLibraryDemo.class</name> - <value>com.datatorrent.demos.pi.Calculator</value> - <description>An alias for the application</description> -</property> -<property> - <name>dt.application.PiLibraryDemo.operator.GenerateX.minvalue</name> - <value>0</value> -</property> -<property> - <name>dt.application.PiLibraryDemo.operator.GenerateX.maxvalue</name> - <value>30000</value> -</property> -<property> - <name>dt.application.PiLibraryDemo.operator.PairXY.size</name> - <value>2</value> -</property> -<!-- -<property> - <name>dt.application.PiLibraryDemo.operator.AnalyzeLocation.constant</name> - <value>900000000</value> -</property> --> - -</configuration> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/d5bf96ca/demos/pi/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/pi/src/test/resources/log4j.properties b/demos/pi/src/test/resources/log4j.properties deleted file mode 100644 index cf0d19e..0000000 --- a/demos/pi/src/test/resources/log4j.properties +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -log4j.rootLogger=DEBUG,CONSOLE - -log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender -log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout -log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.CONSOLE.threshold=${test.log.console.threshold} -test.log.console.threshold=DEBUG - -log4j.appender.RFA=org.apache.log4j.RollingFileAppender -log4j.appender.RFA.layout=org.apache.log4j.PatternLayout -log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n -log4j.appender.RFA.File=/tmp/app.log - -# to enable, add SYSLOG to rootLogger -log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender -log4j.appender.SYSLOG.syslogHost=127.0.0.1 -log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout -log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n -log4j.appender.SYSLOG.Facility=LOCAL1 - -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn -log4j.logger.com.datatorrent=debug -log4j.logger.org.apache.apex=debug
