Repository: incubator-apex-malhar Updated Branches: refs/heads/master ca5ab1124 -> 8331f56da
APEXCORE-60 added iteration demo Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/d13c6f77 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/d13c6f77 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/d13c6f77 Branch: refs/heads/master Commit: d13c6f77c5c1ec1dfc4387bb7b6d6ecb220831d5 Parents: 9c557fc Author: David Yan <[email protected]> Authored: Thu Nov 19 13:20:38 2015 -0800 Committer: David Yan <[email protected]> Committed: Tue Feb 23 10:03:29 2016 -0800 ---------------------------------------------------------------------- demos/iteration/pom.xml | 37 ++++ demos/iteration/src/assemble/appPackage.xml | 59 +++++++ .../demos/iteration/Application.java | 168 +++++++++++++++++++ .../demos/iteration/package-info.java | 22 +++ .../src/main/resources/META-INF/properties.xml | 44 +++++ .../demos/iteration/ApplicationTest.java | 86 ++++++++++ .../src/test/resources/log4j.properties | 40 +++++ demos/pom.xml | 1 + 8 files changed, 457 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/pom.xml ---------------------------------------------------------------------- diff --git a/demos/iteration/pom.xml b/demos/iteration/pom.xml new file mode 100644 index 0000000..5891f42 --- /dev/null +++ b/demos/iteration/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <artifactId>iteration-demo</artifactId> + <packaging>jar</packaging> + + <name>Apache Apex Malhar (incubating) Iteration Demo</name> + <description>DataTorrent demo applications that demonstrates the iteration feature.</description> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-demos</artifactId> + <version>3.4.0-incubating-SNAPSHOT</version> + </parent> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/demos/iteration/src/assemble/appPackage.xml b/demos/iteration/src/assemble/appPackage.xml new file mode 100644 index 0000000..4138cf2 --- /dev/null +++ b/demos/iteration/src/assemble/appPackage.xml @@ -0,0 +1,59 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java ---------------------------------------------------------------------- diff --git a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java new file mode 100644 index 0000000..c0178d8 --- /dev/null +++ b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/Application.java @@ -0,0 +1,168 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.demos.iteration; + +import com.datatorrent.api.Context; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.common.util.DefaultDelayOperator; +import com.datatorrent.lib.testbench.RandomEventGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +/** + * Iteration demo : <br> + * + * <pre> + * LocalMode.runApp(new Application(), 600000); // 10 min run + * </pre> + * + * Run Success : <br> + * For successful deployment and run, user should see the Fibonacci sequence, something like the + * following output on the console: + * + * <pre> + * 1 + * 1 + * 2 + * 3 + * 5 + * 8 + * 13 + * 21 + * 34 + * 55 + * ... + * </pre> + * + */ +@ApplicationAnnotation(name="IterationDemo") +public class Application implements StreamingApplication +{ + private final static Logger LOG = LoggerFactory.getLogger(Application.class); + private String extraOutputFileName; // for unit test + + public static class FibonacciOperator extends BaseOperator + { + public long currentNumber = 1; + private transient long tempNum; + public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>() + { + @Override + public void process(Object tuple) + { + } + }; + public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>() + { + @Override + public void process(Long tuple) + { + tempNum = (currentNumber == 1) ? 1 : tuple; + } + }; + public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>(); + + + @Override + public void endWindow() + { + output.emit(currentNumber); + currentNumber += tempNum; + if (currentNumber <= 0) { + currentNumber = 1; + } + } + } + + public static class StdoutOperator extends BaseOperator + { + private String extraOutputFileName; // for unit test + private transient PrintStream extraOutputStream; + /** + * This is the input port which receives the tuples that will be written to stdout. + */ + public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() + { + @Override + @SuppressWarnings("UseOfSystemOutOrSystemErr") + public void process(Object t) + { + String s = t.toString(); + System.out.println(s); + if (extraOutputStream != null) { + extraOutputStream.println(s); + } + } + }; + + @Override + public void setup(Context.OperatorContext context) + { + if (extraOutputFileName != null) { + try { + extraOutputStream = new PrintStream(new FileOutputStream(extraOutputFileName), true); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + } + } + + @Override + public void teardown() + { + extraOutputStream.close(); + } + + public void setExtraOutputFileName(String fileName) + { + this.extraOutputFileName = fileName; + } + } + + public void setExtraOutputFileName(String fileName) + { + this.extraOutputFileName = fileName; + } + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator()); + FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class); + DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class); + StdoutOperator console = new StdoutOperator(); + console.setExtraOutputFileName(extraOutputFileName); + dag.addOperator("console", console); + dag.addStream("dummy_to_operator", rand.integer_data, fib.dummyInputPort); + dag.addStream("operator_to_delay", fib.output, opDelay.input, console.input); + dag.addStream("delay_to_operator", opDelay.output, fib.input); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java ---------------------------------------------------------------------- diff --git a/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java new file mode 100644 index 0000000..0d24638 --- /dev/null +++ b/demos/iteration/src/main/java/com/datatorrent/demos/iteration/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Iteration demonstration application. + */ +package com.datatorrent.demos.iteration; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/demos/iteration/src/main/resources/META-INF/properties.xml b/demos/iteration/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..bf65e22 --- /dev/null +++ b/demos/iteration/src/main/resources/META-INF/properties.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8" standalone="no"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<configuration> + <!-- Memory settings for all demos --> + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>512</value> + </property> + <property> + <name>dt.attr.DEBUG</name> + <value>true</value> + <property> + <name>dt.application.*.operator.*.attr.MEMORY_MB</name> + <value>128</value> + </property> + <property> + <name>dt.application.*.operator.*.attr.JVM_OPTIONS</name> + <value>-Xmx128M</value> + </property> + <property> + <name>dt.application.*.operator.*.port.*.attr.BUFFER_MEMORY_MB</name> + <value>128</value> + </property> + +</configuration> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java new file mode 100644 index 0000000..7804fcd --- /dev/null +++ b/demos/iteration/src/test/java/com/datatorrent/demos/iteration/ApplicationTest.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.demos.iteration; + + +import org.apache.hadoop.conf.Configuration; +import org.junit.Assert; +import org.junit.Test; + +import com.datatorrent.api.LocalMode; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; + + +/** + * + */ +public class ApplicationTest +{ + @Test + public void testIterationApp() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + Application app = new Application(); + String outputFileName = "target/output.txt"; + long timeout = 10 * 1000; // 10 seconds + + new File(outputFileName).delete(); + app.setExtraOutputFileName(outputFileName); + lma.prepareDAG(app, conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + long startTime = System.currentTimeMillis(); + do { + try { + Thread.sleep(500); + } catch (InterruptedException ex) { + break; + } + File file = new File(outputFileName); + if (file.length() > 50) { + break; + } + } while (System.currentTimeMillis() - startTime < timeout); + + lc.shutdown(); + try (BufferedReader br = new BufferedReader(new FileReader(outputFileName))) { + Assert.assertEquals("1", br.readLine()); + Assert.assertEquals("1", br.readLine()); + Assert.assertEquals("2", br.readLine()); + Assert.assertEquals("3", br.readLine()); + Assert.assertEquals("5", br.readLine()); + Assert.assertEquals("8", br.readLine()); + Assert.assertEquals("13", br.readLine()); + Assert.assertEquals("21", br.readLine()); + Assert.assertEquals("34", br.readLine()); + Assert.assertEquals("55", br.readLine()); + Assert.assertEquals("89", br.readLine()); + Assert.assertEquals("144", br.readLine()); + Assert.assertEquals("233", br.readLine()); + Assert.assertEquals("377", br.readLine()); + Assert.assertEquals("610", br.readLine()); + Assert.assertEquals("987", br.readLine()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/iteration/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/demos/iteration/src/test/resources/log4j.properties b/demos/iteration/src/test/resources/log4j.properties new file mode 100644 index 0000000..451cff3 --- /dev/null +++ b/demos/iteration/src/test/resources/log4j.properties @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/d13c6f77/demos/pom.xml ---------------------------------------------------------------------- diff --git a/demos/pom.xml b/demos/pom.xml index e650ea2..032583a 100644 --- a/demos/pom.xml +++ b/demos/pom.xml @@ -184,6 +184,7 @@ <module>uniquecount</module> <module>r</module> <module>echoserver</module> + <module>iteration</module> </modules> <dependencies>
