http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/innerjoin/src/main/java/com/example/join/POJOGenerator.java ---------------------------------------------------------------------- diff --git a/examples/innerjoin/src/main/java/com/example/join/POJOGenerator.java b/examples/innerjoin/src/main/java/com/example/join/POJOGenerator.java deleted file mode 100644 index 613c80e..0000000 --- a/examples/innerjoin/src/main/java/com/example/join/POJOGenerator.java +++ /dev/null @@ -1,260 +0,0 @@ -package com.example.join; - -import java.util.Random; - -import javax.validation.constraints.Min; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; - -/** - * Generates and emits the SalesEvent/ProductEvent based on isSalesEvent. - */ -public class POJOGenerator implements InputOperator -{ - @Min(1) - private int maxProductId = 100000; - @Min(1) - private int maxCustomerId = 100000; - @Min(1) - private int maxProductCategories = 100; - private double maxAmount = 100.0; - private long tuplesCounter; - private long time; - private long timeIncrement; - private boolean isSalesEvent = true; - // Limit number of emitted tuples per window - @Min(0) - private long maxTuplesPerWindow = 100; - private final Random random = new Random(); - public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); - - @Override - public void beginWindow(long windowId) - { - tuplesCounter = 0; - } - - @Override - public void endWindow() - { - time += timeIncrement; - } - - @Override - public void setup(Context.OperatorContext context) - { - time = System.currentTimeMillis(); - timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * - context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); - } - - @Override - public void teardown() - { - - } - - SalesEvent generateSalesEvent() throws Exception { - - SalesEvent salesEvent = new SalesEvent(); - salesEvent.productId = randomId(maxProductId); - salesEvent.customerId = randomId(maxCustomerId); - salesEvent.amount = randomAmount(); - salesEvent.timestamp = time; - return salesEvent; - } - - ProductEvent generateProductEvent() throws Exception { - ProductEvent productEvent = new ProductEvent(); - productEvent.productId = randomId(maxProductId); - productEvent.productCategory = randomId(maxProductCategories); - productEvent.timestamp = time; - return productEvent; - } - - private int randomId(int max) { - if (max < 1) return 1; - return 1 + random.nextInt(max); - } - - private double randomAmount() { - return maxAmount * random.nextDouble(); - } - - @Override - public void emitTuples() - { - while (tuplesCounter++ < maxTuplesPerWindow) { - try { - if (isSalesEvent) { - SalesEvent event = generateSalesEvent(); - this.output.emit(event); - } else { - ProductEvent event = generateProductEvent(); - this.output.emit(event); - } - - } catch (Exception ex) { - throw new RuntimeException(ex); - } - } - } - - public static class SalesEvent - { - public int customerId; - public int productId; - public int productCategory; - public double amount; - public long timestamp; - - public int getCustomerId() - { - return customerId; - } - - public void setCustomerId(int customerId) - { - this.customerId = customerId; - } - - public int getProductId() - { - return productId; - } - - public void setProductId(int productId) - { - this.productId = productId; - } - - public int getProductCategory() - { - return productCategory; - } - - public void setProductCategory(int productCategory) - { - this.productCategory = productCategory; - } - - public double getAmount() - { - return amount; - } - - public void setAmount(double amount) - { - this.amount = amount; - } - - public long getTimestamp() - { - return timestamp; - } - - public void setTimestamp(long timestamp) - { - this.timestamp = timestamp; - } - } - - public static class ProductEvent - { - public int productId; - public int productCategory; - public long timestamp; - - public int getProductId() - { - return productId; - } - - public void setProductId(int productId) - { - this.productId = productId; - } - - public int getProductCategory() - { - return productCategory; - } - - public void setProductCategory(int productCategory) - { - this.productCategory = productCategory; - } - - public long getTimestamp() - { - return timestamp; - } - - public void setTimestamp(long timestamp) - { - this.timestamp = timestamp; - } - } - - public int getMaxProductId() - { - return maxProductId; - } - - public void setMaxProductId(int maxProductId) - { - this.maxProductId = maxProductId; - } - - public int getMaxCustomerId() - { - return maxCustomerId; - } - - public void setMaxCustomerId(int maxCustomerId) - { - this.maxCustomerId = maxCustomerId; - } - - public int getMaxProductCategories() - { - return maxProductCategories; - } - - public void setMaxProductCategories(int maxProductCategories) - { - this.maxProductCategories = maxProductCategories; - } - - public double getMaxAmount() - { - return maxAmount; - } - - public void setMaxAmount(double maxAmount) - { - this.maxAmount = maxAmount; - } - - public boolean isSalesEvent() - { - return isSalesEvent; - } - - public void setSalesEvent(boolean salesEvent) - { - isSalesEvent = salesEvent; - } - - public long getMaxTuplesPerWindow() - { - return maxTuplesPerWindow; - } - - public void setMaxTuplesPerWindow(long maxTuplesPerWindow) - { - this.maxTuplesPerWindow = maxTuplesPerWindow; - } -}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/innerjoin/src/main/java/org/apache/apex/examples/innerjoin/InnerJoinApplication.java ---------------------------------------------------------------------- diff --git a/examples/innerjoin/src/main/java/org/apache/apex/examples/innerjoin/InnerJoinApplication.java b/examples/innerjoin/src/main/java/org/apache/apex/examples/innerjoin/InnerJoinApplication.java new file mode 100644 index 0000000..3e7c8d5 --- /dev/null +++ b/examples/innerjoin/src/main/java/org/apache/apex/examples/innerjoin/InnerJoinApplication.java @@ -0,0 +1,38 @@ +package org.apache.apex.examples.innerjoin; + +import org.apache.apex.malhar.lib.join.POJOInnerJoinOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +@ApplicationAnnotation(name="InnerJoinExample") +public class InnerJoinApplication implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // SalesEvent Generator + POJOGenerator salesGenerator = dag.addOperator("Input1", new POJOGenerator()); + // ProductEvent Generator + POJOGenerator productGenerator = dag.addOperator("Input2", new POJOGenerator()); + productGenerator.setSalesEvent(false); + + // Inner join Operator + POJOInnerJoinOperator join = dag.addOperator("Join", new POJOInnerJoinOperator()); + ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); + + // Streams + dag.addStream("SalesToJoin", salesGenerator.output, join.input1); + dag.addStream("ProductToJoin", productGenerator.output, join.input2); + dag.addStream("JoinToConsole", join.outputPort, output.input); + + // Setting tuple class properties to the ports of join operator + dag.setInputPortAttribute(join.input1, Context.PortContext.TUPLE_CLASS, POJOGenerator.SalesEvent.class); + dag.setInputPortAttribute(join.input2, Context.PortContext.TUPLE_CLASS, POJOGenerator.ProductEvent.class); + dag.setOutputPortAttribute(join.outputPort,Context.PortContext.TUPLE_CLASS, POJOGenerator.SalesEvent.class); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/innerjoin/src/main/java/org/apache/apex/examples/innerjoin/POJOGenerator.java ---------------------------------------------------------------------- diff --git a/examples/innerjoin/src/main/java/org/apache/apex/examples/innerjoin/POJOGenerator.java b/examples/innerjoin/src/main/java/org/apache/apex/examples/innerjoin/POJOGenerator.java new file mode 100644 index 0000000..2a736c5 --- /dev/null +++ b/examples/innerjoin/src/main/java/org/apache/apex/examples/innerjoin/POJOGenerator.java @@ -0,0 +1,260 @@ +package org.apache.apex.examples.innerjoin; + +import java.util.Random; + +import javax.validation.constraints.Min; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; + +/** + * Generates and emits the SalesEvent/ProductEvent based on isSalesEvent. + */ +public class POJOGenerator implements InputOperator +{ + @Min(1) + private int maxProductId = 100000; + @Min(1) + private int maxCustomerId = 100000; + @Min(1) + private int maxProductCategories = 100; + private double maxAmount = 100.0; + private long tuplesCounter; + private long time; + private long timeIncrement; + private boolean isSalesEvent = true; + // Limit number of emitted tuples per window + @Min(0) + private long maxTuplesPerWindow = 100; + private final Random random = new Random(); + public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>(); + + @Override + public void beginWindow(long windowId) + { + tuplesCounter = 0; + } + + @Override + public void endWindow() + { + time += timeIncrement; + } + + @Override + public void setup(Context.OperatorContext context) + { + time = System.currentTimeMillis(); + timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) * + context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS); + } + + @Override + public void teardown() + { + + } + + SalesEvent generateSalesEvent() throws Exception { + + SalesEvent salesEvent = new SalesEvent(); + salesEvent.productId = randomId(maxProductId); + salesEvent.customerId = randomId(maxCustomerId); + salesEvent.amount = randomAmount(); + salesEvent.timestamp = time; + return salesEvent; + } + + ProductEvent generateProductEvent() throws Exception { + ProductEvent productEvent = new ProductEvent(); + productEvent.productId = randomId(maxProductId); + productEvent.productCategory = randomId(maxProductCategories); + productEvent.timestamp = time; + return productEvent; + } + + private int randomId(int max) { + if (max < 1) return 1; + return 1 + random.nextInt(max); + } + + private double randomAmount() { + return maxAmount * random.nextDouble(); + } + + @Override + public void emitTuples() + { + while (tuplesCounter++ < maxTuplesPerWindow) { + try { + if (isSalesEvent) { + SalesEvent event = generateSalesEvent(); + this.output.emit(event); + } else { + ProductEvent event = generateProductEvent(); + this.output.emit(event); + } + + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + } + + public static class SalesEvent + { + public int customerId; + public int productId; + public int productCategory; + public double amount; + public long timestamp; + + public int getCustomerId() + { + return customerId; + } + + public void setCustomerId(int customerId) + { + this.customerId = customerId; + } + + public int getProductId() + { + return productId; + } + + public void setProductId(int productId) + { + this.productId = productId; + } + + public int getProductCategory() + { + return productCategory; + } + + public void setProductCategory(int productCategory) + { + this.productCategory = productCategory; + } + + public double getAmount() + { + return amount; + } + + public void setAmount(double amount) + { + this.amount = amount; + } + + public long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(long timestamp) + { + this.timestamp = timestamp; + } + } + + public static class ProductEvent + { + public int productId; + public int productCategory; + public long timestamp; + + public int getProductId() + { + return productId; + } + + public void setProductId(int productId) + { + this.productId = productId; + } + + public int getProductCategory() + { + return productCategory; + } + + public void setProductCategory(int productCategory) + { + this.productCategory = productCategory; + } + + public long getTimestamp() + { + return timestamp; + } + + public void setTimestamp(long timestamp) + { + this.timestamp = timestamp; + } + } + + public int getMaxProductId() + { + return maxProductId; + } + + public void setMaxProductId(int maxProductId) + { + this.maxProductId = maxProductId; + } + + public int getMaxCustomerId() + { + return maxCustomerId; + } + + public void setMaxCustomerId(int maxCustomerId) + { + this.maxCustomerId = maxCustomerId; + } + + public int getMaxProductCategories() + { + return maxProductCategories; + } + + public void setMaxProductCategories(int maxProductCategories) + { + this.maxProductCategories = maxProductCategories; + } + + public double getMaxAmount() + { + return maxAmount; + } + + public void setMaxAmount(double maxAmount) + { + this.maxAmount = maxAmount; + } + + public boolean isSalesEvent() + { + return isSalesEvent; + } + + public void setSalesEvent(boolean salesEvent) + { + isSalesEvent = salesEvent; + } + + public long getMaxTuplesPerWindow() + { + return maxTuplesPerWindow; + } + + public void setMaxTuplesPerWindow(long maxTuplesPerWindow) + { + this.maxTuplesPerWindow = maxTuplesPerWindow; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/innerjoin/src/test/java/com/example/join/InnerJoinApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/innerjoin/src/test/java/com/example/join/InnerJoinApplicationTest.java b/examples/innerjoin/src/test/java/com/example/join/InnerJoinApplicationTest.java deleted file mode 100644 index 6da3908..0000000 --- a/examples/innerjoin/src/test/java/com/example/join/InnerJoinApplicationTest.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.example.join; - -import org.junit.Test; -import org.apache.hadoop.conf.Configuration; -import com.datatorrent.api.LocalMode; - -public class InnerJoinApplicationTest -{ - @Test - public void testApplication() throws Exception - { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new InnerJoinApplication(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - Thread.sleep(10 * 1000); - lc.shutdown(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/innerjoin/src/test/java/org/apache/apex/examples/innerjoin/InnerJoinApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/innerjoin/src/test/java/org/apache/apex/examples/innerjoin/InnerJoinApplicationTest.java b/examples/innerjoin/src/test/java/org/apache/apex/examples/innerjoin/InnerJoinApplicationTest.java new file mode 100644 index 0000000..81ae442 --- /dev/null +++ b/examples/innerjoin/src/test/java/org/apache/apex/examples/innerjoin/InnerJoinApplicationTest.java @@ -0,0 +1,21 @@ +package org.apache.apex.examples.innerjoin; + +import org.junit.Test; +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.LocalMode; + +public class InnerJoinApplicationTest +{ + @Test + public void testApplication() throws Exception + { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new InnerJoinApplication(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + Thread.sleep(10 * 1000); + lc.shutdown(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/pom.xml ---------------------------------------------------------------------- diff --git a/examples/parser/pom.xml b/examples/parser/pom.xml index 4eefe75..ec03cae 100644 --- a/examples/parser/pom.xml +++ b/examples/parser/pom.xml @@ -1,261 +1,50 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>parser</artifactId> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples-parser</artifactId> <packaging>jar</packaging> <!-- change these to the appropriate values --> <name>Parser Apps</name> <description>Applications to showcase different parsers</description> - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - <malhar.version>3.7.0-SNAPSHOT</malhar.version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${pmalhar.versioroject.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - <dependencies> - <!-- add your dependencies here --> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> - <!-- + <version>${project.version}</version> + <!-- If you know that your application does not need transitive dependencies pulled in by malhar-library, uncomment the following to reduce the size of your app package. --> - <!-- <exclusions> <exclusion> <groupId>*</groupId> <artifactId>*</artifactId> </exclusion> </exclusions> - --> </dependency> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-contrib</artifactId> - <version>${malhar.version}</version> + <version>${project.version}</version> + <!-- + If you know that your application does not need transitive dependencies pulled in by malhar-library, + uncomment the following to reduce the size of your app package. + --> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> @@ -288,22 +77,9 @@ </dependency> <dependency> <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> + <version>${apex.core.version}</version> <scope>provided</scope> </dependency> </dependencies> - </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/Application.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/Application.java b/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/Application.java deleted file mode 100644 index e75541d..0000000 --- a/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/Application.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.datatorrent.tutorial.jsonparser; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.parser.JsonParser; -import com.datatorrent.lib.formatter.JsonFormatter; -import com.datatorrent.lib.io.ConsoleOutputOperator; - -@ApplicationAnnotation(name = "JsonProcessor") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - - JsonGenerator generator = dag.addOperator("JsonGenerator", JsonGenerator.class); - JsonParser parser = dag.addOperator("JsonParser", JsonParser.class); - JsonFormatter formatter = dag.addOperator("JsonFormatter", JsonFormatter.class); - - ConsoleOutputOperator jsonString = dag.addOperator("JsonString", ConsoleOutputOperator.class); - ConsoleOutputOperator jsonObject = dag.addOperator("JsonObject", ConsoleOutputOperator.class); - ConsoleOutputOperator error = dag.addOperator("Error", ConsoleOutputOperator.class); - - dag.addStream("json", generator.out, parser.in); - dag.addStream("pojo", parser.out, formatter.in); - dag.addStream("jsonString", formatter.out, jsonString.input); - dag.addStream("jsonObject", parser.parsedOutput, jsonObject.input); - dag.addStream("error", parser.err, error.input); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/Campaign.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/Campaign.java b/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/Campaign.java deleted file mode 100644 index f0245eb..0000000 --- a/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/Campaign.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.datatorrent.tutorial.jsonparser; - -import java.util.Date; - -import com.fasterxml.jackson.annotation.JsonFormat; -import com.fasterxml.jackson.annotation.JsonProperty; - -public class Campaign -{ - private int adId; - private String campaignName; - @JsonProperty("budget") - private double campaignBudget; - private boolean weatherTargeting; - @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy/MM/dd") - private Date startDate; - - public int getAdId() - { - return adId; - } - - public void setAdId(int adId) - { - this.adId = adId; - } - - public String getCampaignName() - { - return campaignName; - } - - public void setCampaignName(String campaignName) - { - this.campaignName = campaignName; - } - - public double getCampaignBudget() - { - return campaignBudget; - } - - public void setCampaignBudget(double campaignBudget) - { - this.campaignBudget = campaignBudget; - } - - public boolean isWeatherTargeting() - { - return weatherTargeting; - } - - public void setWeatherTargeting(boolean weatherTargeting) - { - this.weatherTargeting = weatherTargeting; - } - - public Date getStartDate() - { - return startDate; - } - - public void setStartDate(Date startDate) - { - this.startDate = startDate; - } - - @Override - public String toString() - { - return "Campaign [adId=" + adId + ", campaignName=" + campaignName + ", campaignBudget=" + campaignBudget - + ", weatherTargeting=" + weatherTargeting + ", startDate=" + startDate + "]"; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/JsonGenerator.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/JsonGenerator.java b/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/JsonGenerator.java deleted file mode 100644 index 90beb13..0000000 --- a/examples/parser/src/main/java/com/datatorrent/tutorial/jsonparser/JsonGenerator.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.datatorrent.tutorial.jsonparser; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Random; - -import javax.validation.constraints.Min; - -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; - -public class JsonGenerator extends BaseOperator implements InputOperator -{ - - private static final Logger LOG = LoggerFactory.getLogger(JsonGenerator.class); - - @Min(1) - private int numTuples = 20; - private transient int count = 0; - - public static Random rand = new Random(); - public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd"); - public static int[] adId = { 1, 2, 3, 4, 5 }; - public static String[] campaignName = { "cmp1", "cmp2", "cmp3", "cmp4" }; - public static double[] campaignBudget = { 10000.0, 20000.0, 300000.0 }; - public static boolean[] weatherTargeting = { true, false }; - private int sleepTime; - - public final transient DefaultOutputPort<byte[]> out = new DefaultOutputPort<byte[]>(); - - private static String getNext(int num) - { - - JSONObject obj = new JSONObject(); - try { - obj.put("adId", adId[num % adId.length]); - obj.put("campaignName", campaignName[num % campaignName.length]); - obj.put("campaignBudget", campaignBudget[num % campaignBudget.length]); - obj.put("weatherTargeting", weatherTargeting[num % weatherTargeting.length]); - obj.put("startDate", sdf.format(new Date())); - } catch (JSONException e) { - return null; - } - return obj.toString(); - } - - @Override - public void beginWindow(long windowId) - { - count = 0; - } - - @Override - public void emitTuples() - { - if (count++ < numTuples) { - out.emit(getNext(rand.nextInt(numTuples) + 1).getBytes()); - } else { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted"); - } - } - } - - public int getNumTuples() - { - return numTuples; - } - - public void setNumTuples(int numTuples) - { - this.numTuples = numTuples; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/Application.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/Application.java b/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/Application.java new file mode 100644 index 0000000..34b64a3 --- /dev/null +++ b/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/Application.java @@ -0,0 +1,35 @@ +package org.apache.apex.examples.parser.jsonparser; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.parser.JsonParser; +import com.datatorrent.lib.formatter.JsonFormatter; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +@ApplicationAnnotation(name = "JsonProcessor") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + + JsonGenerator generator = dag.addOperator("JsonGenerator", JsonGenerator.class); + JsonParser parser = dag.addOperator("JsonParser", JsonParser.class); + JsonFormatter formatter = dag.addOperator("JsonFormatter", JsonFormatter.class); + + ConsoleOutputOperator jsonString = dag.addOperator("JsonString", ConsoleOutputOperator.class); + ConsoleOutputOperator jsonObject = dag.addOperator("JsonObject", ConsoleOutputOperator.class); + ConsoleOutputOperator error = dag.addOperator("Error", ConsoleOutputOperator.class); + + dag.addStream("json", generator.out, parser.in); + dag.addStream("pojo", parser.out, formatter.in); + dag.addStream("jsonString", formatter.out, jsonString.input); + dag.addStream("jsonObject", parser.parsedOutput, jsonObject.input); + dag.addStream("error", parser.err, error.input); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/Campaign.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/Campaign.java b/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/Campaign.java new file mode 100644 index 0000000..8946f5d --- /dev/null +++ b/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/Campaign.java @@ -0,0 +1,74 @@ +package org.apache.apex.examples.parser.jsonparser; + +import java.util.Date; + +import com.fasterxml.jackson.annotation.JsonFormat; +import com.fasterxml.jackson.annotation.JsonProperty; + +public class Campaign +{ + private int adId; + private String campaignName; + @JsonProperty("budget") + private double campaignBudget; + private boolean weatherTargeting; + @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy/MM/dd") + private Date startDate; + + public int getAdId() + { + return adId; + } + + public void setAdId(int adId) + { + this.adId = adId; + } + + public String getCampaignName() + { + return campaignName; + } + + public void setCampaignName(String campaignName) + { + this.campaignName = campaignName; + } + + public double getCampaignBudget() + { + return campaignBudget; + } + + public void setCampaignBudget(double campaignBudget) + { + this.campaignBudget = campaignBudget; + } + + public boolean isWeatherTargeting() + { + return weatherTargeting; + } + + public void setWeatherTargeting(boolean weatherTargeting) + { + this.weatherTargeting = weatherTargeting; + } + + public Date getStartDate() + { + return startDate; + } + + public void setStartDate(Date startDate) + { + this.startDate = startDate; + } + + @Override + public String toString() + { + return "Campaign [adId=" + adId + ", campaignName=" + campaignName + ", campaignBudget=" + campaignBudget + + ", weatherTargeting=" + weatherTargeting + ", startDate=" + startDate + "]"; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/JsonGenerator.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/JsonGenerator.java b/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/JsonGenerator.java new file mode 100644 index 0000000..a0e3611 --- /dev/null +++ b/examples/parser/src/main/java/org/apache/apex/examples/parser/jsonparser/JsonGenerator.java @@ -0,0 +1,83 @@ +package org.apache.apex.examples.parser.jsonparser; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.Random; + +import javax.validation.constraints.Min; + +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +public class JsonGenerator extends BaseOperator implements InputOperator +{ + + private static final Logger LOG = LoggerFactory.getLogger(JsonGenerator.class); + + @Min(1) + private int numTuples = 20; + private transient int count = 0; + + public static Random rand = new Random(); + public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd"); + public static int[] adId = { 1, 2, 3, 4, 5 }; + public static String[] campaignName = { "cmp1", "cmp2", "cmp3", "cmp4" }; + public static double[] campaignBudget = { 10000.0, 20000.0, 300000.0 }; + public static boolean[] weatherTargeting = { true, false }; + private int sleepTime; + + public final transient DefaultOutputPort<byte[]> out = new DefaultOutputPort<byte[]>(); + + private static String getNext(int num) + { + + JSONObject obj = new JSONObject(); + try { + obj.put("adId", adId[num % adId.length]); + obj.put("campaignName", campaignName[num % campaignName.length]); + obj.put("campaignBudget", campaignBudget[num % campaignBudget.length]); + obj.put("weatherTargeting", weatherTargeting[num % weatherTargeting.length]); + obj.put("startDate", sdf.format(new Date())); + } catch (JSONException e) { + return null; + } + return obj.toString(); + } + + @Override + public void beginWindow(long windowId) + { + count = 0; + } + + @Override + public void emitTuples() + { + if (count++ < numTuples) { + out.emit(getNext(rand.nextInt(numTuples) + 1).getBytes()); + } else { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted"); + } + } + } + + public int getNumTuples() + { + return numTuples; + } + + public void setNumTuples(int numTuples) + { + this.numTuples = numTuples; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/parser/src/main/resources/META-INF/properties.xml b/examples/parser/src/main/resources/META-INF/properties.xml index d8ac178..2cd3c04 100644 --- a/examples/parser/src/main/resources/META-INF/properties.xml +++ b/examples/parser/src/main/resources/META-INF/properties.xml @@ -15,12 +15,12 @@ <property> <name>dt.application.JsonProcessor.operator.JsonParser.port.out.attr.TUPLE_CLASS </name> - <value>com.datatorrent.tutorial.jsonparser.Campaign</value> + <value>org.apache.apex.examples.parser.jsonparser.Campaign</value> </property> <property> <name>dt.application.JsonProcessor.operator.JsonFormatter.port.in.attr.TUPLE_CLASS </name> - <value>com.datatorrent.tutorial.jsonparser.Campaign</value> + <value>org.apache.apex.examples.parser.jsonparser.Campaign</value> </property> <property> <name>dt.application.JsonProcessor.operator.JsonString.prop.stringFormat http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/src/test/java/com/datatorrent/tutorial/jsonparser/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/test/java/com/datatorrent/tutorial/jsonparser/ApplicationTest.java b/examples/parser/src/test/java/com/datatorrent/tutorial/jsonparser/ApplicationTest.java deleted file mode 100644 index 390d4cb..0000000 --- a/examples/parser/src/test/java/com/datatorrent/tutorial/jsonparser/ApplicationTest.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.datatorrent.tutorial.jsonparser; - -import java.io.IOException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; -import org.junit.Test; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.tutorial.jsonparser.Application; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest -{ - - @Test - public void testApplication() throws IOException, Exception - { - - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10 * 1000); // runs for 30 seconds and quits - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/parser/src/test/java/org/apache/apex/examples/parser/jsonparser/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/parser/src/test/java/org/apache/apex/examples/parser/jsonparser/ApplicationTest.java b/examples/parser/src/test/java/org/apache/apex/examples/parser/jsonparser/ApplicationTest.java new file mode 100644 index 0000000..6f1a6f6 --- /dev/null +++ b/examples/parser/src/test/java/org/apache/apex/examples/parser/jsonparser/ApplicationTest.java @@ -0,0 +1,35 @@ +package org.apache.apex.examples.parser.jsonparser; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest +{ + + @Test + public void testApplication() throws IOException, Exception + { + + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10 * 1000); // runs for 30 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/pom.xml ---------------------------------------------------------------------- diff --git a/examples/partition/pom.xml b/examples/partition/pom.xml index ac15981..cafdcd7 100644 --- a/examples/partition/pom.xml +++ b/examples/partition/pom.xml @@ -1,274 +1,18 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>Test-Ram</artifactId> - <packaging>jar</packaging> - - <!-- change these to the appropriate values --> - <name>Test_ram</name> - <description>Test_ram</description> - - <properties> - <!-- change this if you desire to use a different version of Apex core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - - <dependencies> - <!-- add your dependencies here --> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>3.6.0</version> + <artifactId>malhar-examples-partition</artifactId> + <packaging>jar</packaging> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> - <!-- - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - --> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> - </dependency> - </dependencies> + <!-- change these to the appropriate values --> + <name>Partition</name> + <description>malhar-examples-partition</description> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/main/java/com/example/myapexapp/Application.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/com/example/myapexapp/Application.java b/examples/partition/src/main/java/com/example/myapexapp/Application.java deleted file mode 100644 index e1ca2ff..0000000 --- a/examples/partition/src/main/java/com/example/myapexapp/Application.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.example.myapexapp; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DAG.Locality; -import com.datatorrent.lib.io.ConsoleOutputOperator; - -@ApplicationAnnotation(name="TestStuff") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - RandomNumberGenerator random = dag.addOperator("randomInt", RandomNumberGenerator.class); - TestPartition testPartition = dag.addOperator("testPartition", TestPartition.class); - Codec3 codec = new Codec3(); - dag.setInputPortAttribute(testPartition.in, PortContext.STREAM_CODEC, codec); - - //Add locality if needed, e.g.: .setLocality(Locality.CONTAINER_LOCAL); - dag.addStream("randomData", random.out, testPartition.in); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/main/java/com/example/myapexapp/Codec3.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/com/example/myapexapp/Codec3.java b/examples/partition/src/main/java/com/example/myapexapp/Codec3.java deleted file mode 100644 index 2754e9b..0000000 --- a/examples/partition/src/main/java/com/example/myapexapp/Codec3.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.example.myapexapp; - -import com.datatorrent.lib.codec.KryoSerializableStreamCodec; - -public class Codec3 extends KryoSerializableStreamCodec<Integer> { - @Override - public int getPartition(Integer tuple) { - final int v = tuple; - return (1 == (v & 1)) ? 0 // odd - : (0 == (v & 3)) ? 1 // divisible by 4 - : 2; // divisible by 2 but not 4 - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java deleted file mode 100644 index de2797b..0000000 --- a/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.example.myapexapp; - -import java.util.Random; - -import javax.validation.constraints.Min; -import javax.validation.constraints.Size; -import javax.validation.ConstraintViolation; -import javax.validation.ValidatorFactory; -import javax.validation.Validator; -import javax.validation.Validation; - -import com.datatorrent.api.Attribute; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.Context; -import com.datatorrent.api.Context.OperatorContext; - -/** - * This is a simple operator that emits random integer. - */ -public class RandomNumberGenerator extends BaseOperator implements InputOperator -{ - private static final Logger LOG = LoggerFactory.getLogger(RandomNumberGenerator.class); - - @Min(1) - private int numTuples = 20; - private transient int count = 0; - - private int sleepTime; - private transient long curWindowId; - private transient Random random = new Random(); - - public final transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>(); - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - - long appWindowId = context.getValue(context.ACTIVATION_WINDOW_ID); - sleepTime = context.getValue(context.SPIN_MILLIS); - LOG.debug("Started setup, appWindowId = {}, sleepTime = {}", appWindowId, sleepTime); - } - - @Override - public void beginWindow(long windowId) - { - count = 0; - LOG.debug("beginWindow: windowId = {}", windowId); - } - - @Override - public void emitTuples() - { - if (count++ < numTuples) { - out.emit(random.nextInt()); - } else { - LOG.debug("count = {}, time = {}", count, System.currentTimeMillis()); - - try { - // avoid repeated calls to this function - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted"); - } - } - } - - public int getNumTuples() - { - return numTuples; - } - - public void setNumTuples(int numTuples) - { - this.numTuples = numTuples; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java b/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java deleted file mode 100644 index 1f77e72..0000000 --- a/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java +++ /dev/null @@ -1,164 +0,0 @@ -package com.example.myapexapp; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Random; - -import javax.validation.constraints.Max; -import javax.validation.constraints.Min; -import javax.validation.constraints.Size; -import javax.validation.ConstraintViolation; -import javax.validation.ValidatorFactory; -import javax.validation.Validator; -import javax.validation.Validation; - -import com.datatorrent.api.Attribute; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.Context; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.DefaultPartition; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.Partitioner; -import com.datatorrent.api.Partitioner.Partition; -import com.datatorrent.api.Partitioner.PartitionKeys; -import com.datatorrent.api.Partitioner.PartitioningContext; - -import com.datatorrent.common.util.BaseOperator; - -/** - * Simple operator to test partitioning - */ -public class TestPartition extends BaseOperator implements Partitioner<TestPartition> -{ - private static final Logger LOG = LoggerFactory.getLogger(TestPartition.class); - - private transient int id; // operator/partition id - private transient long curWindowId; // current window id - private transient long cnt; // per-window tuple count - - @Min(1) @Max(20) - private int nPartitions = 3; - - public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>() { - @Override - public void process(Integer tuple) - { - LOG.debug("{}: tuple = {}, operator id = {}", cnt, tuple, id); - ++cnt; - } - }; - - //public final transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>(); - - @Override - public void setup(Context.OperatorContext context) - { - super.setup(context); - - long appWindowId = context.getValue(context.ACTIVATION_WINDOW_ID); - id = context.getId(); - LOG.debug("Started setup, appWindowId = {}, operator id = {}", appWindowId, id); - } - - @Override - public void beginWindow(long windowId) - { - cnt = 0; - curWindowId = windowId; - LOG.debug("window id = {}, operator id = {}", curWindowId, id); - } - - @Override - public void endWindow() - { - LOG.debug("window id = {}, operator id = {}, cnt = {}", curWindowId, id, cnt); - } - - @Override - public void partitioned(Map<Integer, Partition<TestPartition>> partitions) - { - //Do nothing - } - - @Override - public Collection<Partition<TestPartition>> definePartitions( - Collection<Partition<TestPartition>> partitions, - PartitioningContext context) - { - int oldSize = partitions.size(); - LOG.debug("partitionCount: current = {} requested = {}", oldSize, nPartitions); - - // each partition i in 0...nPartitions receives tuples divisible by i but not by any other - // j in that range; all other tuples ignored - // - if (3 != nPartitions) return getPartitions(partitions, context); - - // special case of 3 partitions: All odd numbers to partition 0; even numbers divisible - // by 4 to partition 1, those divisible by 2 but not 4 to partition 2. - - // mask used to extract discriminant from tuple hashcode - int mask = 0x03; - - Partition<TestPartition>[] newPartitions = new Partition[] { - new DefaultPartition<TestPartition>(new TestPartition()), - new DefaultPartition<TestPartition>(new TestPartition()), - new DefaultPartition<TestPartition>(new TestPartition()) }; - - HashSet<Integer>[] set - = new HashSet[] {new HashSet<>(), new HashSet<>(), new HashSet<>()}; - set[0].add(0); - set[1].add(1); - set[2].add(2); - - PartitionKeys[] keys = { - new PartitionKeys(mask, set[0]), - new PartitionKeys(mask, set[1]), - new PartitionKeys(mask, set[2]) }; - - for (int i = 0; i < 3; ++i ) { - Partition<TestPartition> partition = newPartitions[i]; - partition.getPartitionKeys().put(in, keys[i]); - } - - return new ArrayList<Partition<TestPartition>>(Arrays.asList(newPartitions)); - } // definePartitions - - private Collection<Partition<TestPartition>> getPartitions( - Collection<Partition<TestPartition>> partitions, - PartitioningContext context) - { - // create array of partitions to return - Collection<Partition<TestPartition>> result - = new ArrayList<Partition<TestPartition>>(nPartitions); - - int mask = getMask(nPartitions); - for (int i = 0; i < nPartitions; ++i) { - HashSet<Integer> set = new HashSet<>(); - set.add(i); - PartitionKeys keys = new PartitionKeys(mask, set); - Partition partition = new DefaultPartition<TestPartition>(new TestPartition()); - partition.getPartitionKeys().put(in, keys); - } - - return result; - } // getPartitions - - // return mask with bits 0..N set where N is the highest set bit of argument - private int getMask(final int n) { - return -1 >>> Integer.numberOfLeadingZeros(n); - } // getMask - - // accessors - public int getNPartitions() { return nPartitions; } - public void setNPartitions(int v) { nPartitions = v; } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/main/java/org/apache/apex/examples/partition/Application.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/org/apache/apex/examples/partition/Application.java b/examples/partition/src/main/java/org/apache/apex/examples/partition/Application.java new file mode 100644 index 0000000..eb9a7c2 --- /dev/null +++ b/examples/partition/src/main/java/org/apache/apex/examples/partition/Application.java @@ -0,0 +1,25 @@ +package org.apache.apex.examples.partition; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; + +@ApplicationAnnotation(name="TestStuff") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + RandomNumberGenerator random = dag.addOperator("randomInt", RandomNumberGenerator.class); + TestPartition testPartition = dag.addOperator("testPartition", TestPartition.class); + Codec3 codec = new Codec3(); + dag.setInputPortAttribute(testPartition.in, PortContext.STREAM_CODEC, codec); + + //Add locality if needed, e.g.: .setLocality(Locality.CONTAINER_LOCAL); + dag.addStream("randomData", random.out, testPartition.in); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/main/java/org/apache/apex/examples/partition/Codec3.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/org/apache/apex/examples/partition/Codec3.java b/examples/partition/src/main/java/org/apache/apex/examples/partition/Codec3.java new file mode 100644 index 0000000..daa0e05 --- /dev/null +++ b/examples/partition/src/main/java/org/apache/apex/examples/partition/Codec3.java @@ -0,0 +1,13 @@ +package org.apache.apex.examples.partition; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; + +public class Codec3 extends KryoSerializableStreamCodec<Integer> { + @Override + public int getPartition(Integer tuple) { + final int v = tuple; + return (1 == (v & 1)) ? 0 // odd + : (0 == (v & 3)) ? 1 // divisible by 4 + : 2; // divisible by 2 but not 4 + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/main/java/org/apache/apex/examples/partition/RandomNumberGenerator.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/org/apache/apex/examples/partition/RandomNumberGenerator.java b/examples/partition/src/main/java/org/apache/apex/examples/partition/RandomNumberGenerator.java new file mode 100644 index 0000000..e3959d1 --- /dev/null +++ b/examples/partition/src/main/java/org/apache/apex/examples/partition/RandomNumberGenerator.java @@ -0,0 +1,76 @@ +package org.apache.apex.examples.partition; + +import java.util.Random; + +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.Context; + +/** + * This is a simple operator that emits random integer. + */ +public class RandomNumberGenerator extends BaseOperator implements InputOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(RandomNumberGenerator.class); + + @Min(1) + private int numTuples = 20; + private transient int count = 0; + + private int sleepTime; + private transient long curWindowId; + private transient Random random = new Random(); + + public final transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>(); + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + long appWindowId = context.getValue(context.ACTIVATION_WINDOW_ID); + sleepTime = context.getValue(context.SPIN_MILLIS); + LOG.debug("Started setup, appWindowId = {}, sleepTime = {}", appWindowId, sleepTime); + } + + @Override + public void beginWindow(long windowId) + { + count = 0; + LOG.debug("beginWindow: windowId = {}", windowId); + } + + @Override + public void emitTuples() + { + if (count++ < numTuples) { + out.emit(random.nextInt()); + } else { + LOG.debug("count = {}, time = {}", count, System.currentTimeMillis()); + + try { + // avoid repeated calls to this function + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted"); + } + } + } + + public int getNumTuples() + { + return numTuples; + } + + public void setNumTuples(int numTuples) + { + this.numTuples = numTuples; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/main/java/org/apache/apex/examples/partition/TestPartition.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/org/apache/apex/examples/partition/TestPartition.java b/examples/partition/src/main/java/org/apache/apex/examples/partition/TestPartition.java new file mode 100644 index 0000000..5b4e693 --- /dev/null +++ b/examples/partition/src/main/java/org/apache/apex/examples/partition/TestPartition.java @@ -0,0 +1,149 @@ +package org.apache.apex.examples.partition; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Map; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.Partitioner; + +import com.datatorrent.common.util.BaseOperator; + +/** + * Simple operator to test partitioning + */ +public class TestPartition extends BaseOperator implements Partitioner<TestPartition> +{ + private static final Logger LOG = LoggerFactory.getLogger(TestPartition.class); + + private transient int id; // operator/partition id + private transient long curWindowId; // current window id + private transient long cnt; // per-window tuple count + + @Min(1) @Max(20) + private int nPartitions = 3; + + public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>() { + @Override + public void process(Integer tuple) + { + LOG.debug("{}: tuple = {}, operator id = {}", cnt, tuple, id); + ++cnt; + } + }; + + //public final transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>(); + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + long appWindowId = context.getValue(context.ACTIVATION_WINDOW_ID); + id = context.getId(); + LOG.debug("Started setup, appWindowId = {}, operator id = {}", appWindowId, id); + } + + @Override + public void beginWindow(long windowId) + { + cnt = 0; + curWindowId = windowId; + LOG.debug("window id = {}, operator id = {}", curWindowId, id); + } + + @Override + public void endWindow() + { + LOG.debug("window id = {}, operator id = {}, cnt = {}", curWindowId, id, cnt); + } + + @Override + public void partitioned(Map<Integer, Partition<TestPartition>> partitions) + { + //Do nothing + } + + @Override + public Collection<Partition<TestPartition>> definePartitions( + Collection<Partition<TestPartition>> partitions, + PartitioningContext context) + { + int oldSize = partitions.size(); + LOG.debug("partitionCount: current = {} requested = {}", oldSize, nPartitions); + + // each partition i in 0...nPartitions receives tuples divisible by i but not by any other + // j in that range; all other tuples ignored + // + if (3 != nPartitions) return getPartitions(partitions, context); + + // special case of 3 partitions: All odd numbers to partition 0; even numbers divisible + // by 4 to partition 1, those divisible by 2 but not 4 to partition 2. + + // mask used to extract discriminant from tuple hashcode + int mask = 0x03; + + Partition<TestPartition>[] newPartitions = new Partition[] { + new DefaultPartition<TestPartition>(new TestPartition()), + new DefaultPartition<TestPartition>(new TestPartition()), + new DefaultPartition<TestPartition>(new TestPartition()) }; + + HashSet<Integer>[] set + = new HashSet[] {new HashSet<>(), new HashSet<>(), new HashSet<>()}; + set[0].add(0); + set[1].add(1); + set[2].add(2); + + PartitionKeys[] keys = { + new PartitionKeys(mask, set[0]), + new PartitionKeys(mask, set[1]), + new PartitionKeys(mask, set[2]) }; + + for (int i = 0; i < 3; ++i ) { + Partition<TestPartition> partition = newPartitions[i]; + partition.getPartitionKeys().put(in, keys[i]); + } + + return new ArrayList<Partition<TestPartition>>(Arrays.asList(newPartitions)); + } // definePartitions + + private Collection<Partition<TestPartition>> getPartitions( + Collection<Partition<TestPartition>> partitions, + PartitioningContext context) + { + // create array of partitions to return + Collection<Partition<TestPartition>> result + = new ArrayList<Partition<TestPartition>>(nPartitions); + + int mask = getMask(nPartitions); + for (int i = 0; i < nPartitions; ++i) { + HashSet<Integer> set = new HashSet<>(); + set.add(i); + PartitionKeys keys = new PartitionKeys(mask, set); + Partition partition = new DefaultPartition<TestPartition>(new TestPartition()); + partition.getPartitionKeys().put(in, keys); + } + + return result; + } // getPartitions + + // return mask with bits 0..N set where N is the highest set bit of argument + private int getMask(final int n) { + return -1 >>> Integer.numberOfLeadingZeros(n); + } // getMask + + // accessors + public int getNPartitions() { return nPartitions; } + public void setNPartitions(int v) { nPartitions = v; } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/main/resources/my-log4j.properties ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/resources/my-log4j.properties b/examples/partition/src/main/resources/my-log4j.properties index 21ead89..8949bfa 100644 --- a/examples/partition/src/main/resources/my-log4j.properties +++ b/examples/partition/src/main/resources/my-log4j.properties @@ -13,4 +13,4 @@ log4j.logger.org=INFO log4j.logger.org.apache=INFO log4j.logger.com.datatorrent=INFO -#log4j.logger.com.example.myapexapp.TestPartition=DEBUG, CONSOLE +#log4j.logger.TestPartition=DEBUG, CONSOLE http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java deleted file mode 100644 index 5f490d8..0000000 --- a/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.example.myapexapp; - -import java.io.IOException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -import com.datatorrent.api.LocalMode; -import com.example.myapexapp.Application; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest { - - @Test - public void testApplication() throws IOException, Exception { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(5000); // runs for 5 seconds and quits - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/partition/src/test/java/org/apache/apex/examples/partition/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/test/java/org/apache/apex/examples/partition/ApplicationTest.java b/examples/partition/src/test/java/org/apache/apex/examples/partition/ApplicationTest.java new file mode 100644 index 0000000..4cfb3c2 --- /dev/null +++ b/examples/partition/src/test/java/org/apache/apex/examples/partition/ApplicationTest.java @@ -0,0 +1,36 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.partition; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest { + + @Test + public void testApplication() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(5000); // runs for 5 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/pom.xml ---------------------------------------------------------------------- diff --git a/examples/pom.xml b/examples/pom.xml index 30ce061..be0f0d3 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -187,6 +187,16 @@ <module>r</module> <module>echoserver</module> <module>iteration</module> + <module>csvformatter</module> + <module>dedup</module> + <module>dynamic-partition</module> + <module>enricher</module> + <module>filter</module> + <module>innerjoin</module> + <module>partition</module> + <module>recordReader</module> + <module>throttle</module> + <module>transform</module> </modules> <dependencies>
