http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/App.java ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/App.java b/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/App.java new file mode 100644 index 0000000..1a40cb5 --- /dev/null +++ b/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/App.java @@ -0,0 +1,23 @@ +package org.apache.apex.examples.dynamic; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import com.datatorrent.lib.stream.DevNull; + +@ApplicationAnnotation(name="Dyn") +public class App implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + Gen gen = dag.addOperator("gen", Gen.class); + DevNull devNull = dag.addOperator("devNull", DevNull.class); + + dag.addStream("data", gen.out, devNull.data); + } +}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/Gen.java ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/Gen.java b/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/Gen.java new file mode 100644 index 0000000..29f79ac --- /dev/null +++ b/examples/dynamic-partition/src/main/java/org/apache/apex/examples/dynamic/Gen.java @@ -0,0 +1,171 @@ +package org.apache.apex.examples.dynamic; + +import java.io.ByteArrayOutputStream; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.io.IOUtils; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.collect.Lists; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.StatsListener; +import com.datatorrent.common.util.BaseOperator; + +/** + * Operator that dynamically partitions itself after 500 tuples have been emitted + */ +public class Gen extends BaseOperator implements InputOperator, Partitioner<Gen>, StatsListener +{ + private static final Logger LOG = LoggerFactory.getLogger(Gen.class); + + private static final int MAX_PARTITIONS = 4; // maximum number of partitions + + private int partitions = 2; // initial number of partitions + + @NotNull + private int numTuples; // number of tuples to emit per window + + private transient int count = 0; + + public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>(); + + @Override + public void partitioned(Map<Integer, Partition<Gen>> map) + { + if (partitions != map.size()) { + String msg = String.format("partitions = %d, map.size = %d%n", partitions, map.size()); + throw new RuntimeException(msg); + } + } + + @Override + public void beginWindow(long windowId) + { + count = 0; + } + + @Override + public void emitTuples() + { + if (count < numTuples) { + ++count; + out.emit(Math.random()); + } + } + + public int getNumTuples() + { + return numTuples; + } + + /** + * Sets the number of tuples to be emitted every window. + * @param numTuples number of tuples + */ + public void setNumTuples(int numTuples) + { + this.numTuples = numTuples; + } + + @Override + public Response processStats(BatchedOperatorStats batchedOperatorStats) { + + final long emittedCount = batchedOperatorStats.getTuplesEmittedPSMA(); + + // we only perform a single dynamic repartition + Response res = new Response(); + res.repartitionRequired = false; + if (emittedCount > 500 && partitions < MAX_PARTITIONS) { + LOG.info("processStats: trying repartition of input operator current {} required {}", + partitions, MAX_PARTITIONS); + LOG.info("**** operator id = {}, window id = {}, tuplesProcessedPSMA = {}, tuplesEmittedPSMA = {}", + batchedOperatorStats.getOperatorId(), + batchedOperatorStats.getCurrentWindowId(), + batchedOperatorStats.getTuplesProcessedPSMA(), + emittedCount); + partitions = MAX_PARTITIONS; + res.repartitionRequired = true; + } + + return res; + } // processStats + + /** + * Clone object by serializing and deserializing using Kryo. + * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields. + * + * @param kryo kryo object used to clone objects + * @param src src object that copy from + * @return cloned object + */ + @SuppressWarnings("unchecked") + private static <SRC> SRC cloneObject(Kryo kryo, SRC src) + { + kryo.setClassLoader(src.getClass().getClassLoader()); + ByteArrayOutputStream bos = null; + Output output; + Input input = null; + try { + bos = new ByteArrayOutputStream(); + output = new Output(bos); + kryo.writeObject(output, src); + output.close(); + input = new Input(bos.toByteArray()); + return (SRC)kryo.readObject(input, src.getClass()); + } finally { + IOUtils.closeQuietly(input); + IOUtils.closeQuietly(bos); + } + } + + @Override + public Collection<Partition<Gen>> definePartitions( + Collection<Partition<Gen>> list, PartitioningContext context) + { + if (partitions < 0) { // error + String msg = String.format("Error: Bad value: partitions = %d%n", partitions); + LOG.error(msg); + throw new RuntimeException(msg); + } + + final int prevCount = list.size(); + if (1 == prevCount) { // initial call + LOG.info("definePartitions: First call, prevCount = {}, partitions = {}", + prevCount, partitions); + } + + if (prevCount == partitions) { + LOG.info("definePartitions: Nothing to do in definePartitions"); + return list; // nothing to do + } + + LOG.debug("definePartitions: Repartitioning from {} to {}", prevCount, partitions); + + Kryo kryo = new Kryo(); + + // return value: new list of partitions (includes old list) + List<Partition<Gen>> newPartitions = Lists.newArrayListWithExpectedSize(partitions); + + for (int i = 0; i < partitions; i++) { + Gen oper = cloneObject(kryo, this); + newPartitions.add(new DefaultPartition<>(oper)); + } + + LOG.info("definePartition: returning {} partitions", newPartitions.size()); + return newPartitions; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/test/java/com/example/dynamic/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/src/test/java/com/example/dynamic/ApplicationTest.java b/examples/dynamic-partition/src/test/java/com/example/dynamic/ApplicationTest.java deleted file mode 100644 index 788b9d3..0000000 --- a/examples/dynamic-partition/src/test/java/com/example/dynamic/ApplicationTest.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.example.dynamic; - -import java.io.IOException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -import com.datatorrent.api.LocalMode; -//import com.example.myapexapp.Application; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest { - - @Test - public void testApplication() throws IOException, Exception { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new App(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); // runs for 10 seconds and quits - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/test/java/org/apache/apex/examples/dynamic/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/src/test/java/org/apache/apex/examples/dynamic/ApplicationTest.java b/examples/dynamic-partition/src/test/java/org/apache/apex/examples/dynamic/ApplicationTest.java new file mode 100644 index 0000000..a4c8076 --- /dev/null +++ b/examples/dynamic-partition/src/test/java/org/apache/apex/examples/dynamic/ApplicationTest.java @@ -0,0 +1,33 @@ +package org.apache.apex.examples.dynamic; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest { + + @Test + public void testApplication() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new App(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); // runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/pom.xml ---------------------------------------------------------------------- diff --git a/examples/enricher/pom.xml b/examples/enricher/pom.xml index a93bcf5..7a55d32 100644 --- a/examples/enricher/pom.xml +++ b/examples/enricher/pom.xml @@ -1,263 +1,25 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>enricher</artifactId> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples-enricher</artifactId> <packaging>jar</packaging> <!-- change these to the appropriate values --> <name>Enricher</name> <description>Example Use of POJO Enricher</description> - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - <malhar.version>3.6.0</malhar.version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - <dependencies> - <!-- add your dependencies here --> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-contrib</artifactId> - <version>${malhar.version}</version> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> + <version>${project.version}</version> <exclusions> <exclusion> <groupId>*</groupId> @@ -265,27 +27,6 @@ </exclusion> </exclusions> </dependency> - - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> - </dependency> - <dependency> <groupId>com.github.fge</groupId> <artifactId>json-schema-validator</artifactId> @@ -303,6 +44,25 @@ <artifactId>janino</artifactId> <version>2.7.8</version> </dependency> + <dependency> + <groupId>org.codehaus.jackson</groupId> + <artifactId>jackson-mapper-asl</artifactId> + <version>1.9.13</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>0.13.1</version> + </dependency> + <dependency> + <groupId>org.codehaus.jettison</groupId> + <artifactId>jettison</artifactId> + <version>1.1</version> + </dependency> + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + <version>2.7.0</version> + </dependency> </dependencies> - </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/DataGenerator.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/com/example/myapexapp/DataGenerator.java b/examples/enricher/src/main/java/com/example/myapexapp/DataGenerator.java deleted file mode 100644 index 3afbb87..0000000 --- a/examples/enricher/src/main/java/com/example/myapexapp/DataGenerator.java +++ /dev/null @@ -1,94 +0,0 @@ -package com.example.myapexapp; - -import java.util.Random; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; - -/** - * Generates Subscriber Data: - * A Party Phone - * A Party IMEI - * A Party IMSI - * Circle Id - */ -public class DataGenerator extends BaseOperator implements InputOperator -{ - public static int NUM_CIRCLES = 10; - - private Random r; - private int count = 0; - private int limit = 1000; - - public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<>(); - - @Override - public void setup(OperatorContext context) - { - r = new Random(System.currentTimeMillis()); - } - - @Override - public void beginWindow(long windowId) { - super.beginWindow(windowId); - count = 0; - } - - @Override - public void emitTuples() - { - if(count++ < limit) { - output.emit(getRecord()); - } - } - - private byte[] getRecord() - { - String phone = getRandomNumber(10); - String imsi = getHashInRange(phone, 15); - String imei = getHashInRange(imsi, 15); - String circleId = Math.abs(phone.hashCode()) % NUM_CIRCLES + ""; -// String record = MessageFormat.format(baseDataTemplate, phone, imsi, imei, circleId); - String record = "{" + - "\"phone\":\"" + phone + "\"," + - "\"imei\":\"" + imei+ "\"," + - "\"imsi\":\"" + imsi+ "\"," + - "\"circleId\":" + circleId + - "}"; - return record.getBytes(); - } - - private String getRandomNumber(int numDigits) - { - String retVal = (r.nextInt((9 - 1) + 1) + 1) + ""; - - for (int i = 0; i < numDigits - 1; i++) { - retVal += (r.nextInt((9 - 0) + 1) + 0); - } - return retVal; - } - - private String getHashInRange(String s, int n) - { - StringBuilder retVal = new StringBuilder(); - for (int i = 0, j = 0; i < n && j < s.length(); i++, j++) { - retVal.append(Math.abs(s.charAt(j) + "".hashCode()) % 10); - if (j == s.length() - 1) { - j = -1; - } - } - return retVal.toString(); - } - - public int getLimit() - { - return limit; - } - - public void setLimit(int limit) - { - this.limit = limit; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/EnricherAppWithJSONFile.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/com/example/myapexapp/EnricherAppWithJSONFile.java b/examples/enricher/src/main/java/com/example/myapexapp/EnricherAppWithJSONFile.java deleted file mode 100644 index a0dab64..0000000 --- a/examples/enricher/src/main/java/com/example/myapexapp/EnricherAppWithJSONFile.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.example.myapexapp; - -import java.util.ArrayList; - -import com.datatorrent.contrib.enrich.JsonFSLoader; -import com.datatorrent.contrib.enrich.POJOEnricher; -import com.datatorrent.contrib.parser.JsonParser; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -@ApplicationAnnotation(name="EnricherAppWithJSONFile") -public class EnricherAppWithJSONFile implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - DataGenerator dataGenerator = dag.addOperator("DataGenerator", DataGenerator.class); - JsonParser parser = dag.addOperator("Parser", JsonParser.class); - - /** - * FSLoader is used to configure Enricher backend. Property of FSLoader file which is fileName is set in - * properties.xml file. - * The format that is used to read the file is present as an example in resources/circleMapping.txt file. - */ - JsonFSLoader fsLoader = new JsonFSLoader(); - POJOEnricher enrich = dag.addOperator("Enrich", POJOEnricher.class); - enrich.setStore(fsLoader); - - ArrayList includeFields = new ArrayList(); - includeFields.add("circleName"); - ArrayList lookupFields = new ArrayList(); - lookupFields.add("circleId"); - - enrich.setIncludeFields(includeFields); - enrich.setLookupFields(lookupFields); - - ConsoleOutputOperator console = dag.addOperator("Console", ConsoleOutputOperator.class); - - dag.addStream("Parse", dataGenerator.output, parser.in); - dag.addStream("Enrich", parser.out, enrich.input); - dag.addStream("Console", enrich.output, console.input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/LineOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/com/example/myapexapp/LineOutputOperator.java b/examples/enricher/src/main/java/com/example/myapexapp/LineOutputOperator.java deleted file mode 100644 index 3b7a298..0000000 --- a/examples/enricher/src/main/java/com/example/myapexapp/LineOutputOperator.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.example.myapexapp; - -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -import javax.validation.constraints.NotNull; - -import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; - -/** - * Converts each tuple to a string and writes it as a new line to the output file - */ -public class LineOutputOperator extends AbstractFileOutputOperator<Object> -{ - private static final String NL = System.lineSeparator(); - private static final Charset CS = StandardCharsets.UTF_8; - - @NotNull - private String baseName; - - @Override - public byte[] getBytesForTuple(Object t) { - String result = new String(t.toString().getBytes(), CS) + NL; - return result.getBytes(CS); - } - - @Override - protected String getFileName(Object tuple) { - return baseName; - } - - public String getBaseName() { return baseName; } - public void setBaseName(String v) { baseName = v; } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/POJO.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/com/example/myapexapp/POJO.java b/examples/enricher/src/main/java/com/example/myapexapp/POJO.java deleted file mode 100644 index 32845e8..0000000 --- a/examples/enricher/src/main/java/com/example/myapexapp/POJO.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.example.myapexapp; - -public class POJO -{ - private String phone; - private String imei; - private String imsi; - private int circleId; - - public String getPhone() - { - return phone; - } - - public void setPhone(String phone) - { - this.phone = phone; - } - - public String getImei() - { - return imei; - } - - public void setImei(String imei) - { - this.imei = imei; - } - - public String getImsi() - { - return imsi; - } - - public void setImsi(String imsi) - { - this.imsi = imsi; - } - - public int getCircleId() - { - return circleId; - } - - public void setCircleId(int circleId) - { - this.circleId = circleId; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/com/example/myapexapp/POJOEnriched.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/com/example/myapexapp/POJOEnriched.java b/examples/enricher/src/main/java/com/example/myapexapp/POJOEnriched.java deleted file mode 100644 index bed2cfb..0000000 --- a/examples/enricher/src/main/java/com/example/myapexapp/POJOEnriched.java +++ /dev/null @@ -1,71 +0,0 @@ -package com.example.myapexapp; - -public class POJOEnriched -{ - private String phone; - private String imei; - private String imsi; - private int circleId; - private String circleName; - - public String getPhone() - { - return phone; - } - - public void setPhone(String phone) - { - this.phone = phone; - } - - public String getImei() - { - return imei; - } - - public void setImei(String imei) - { - this.imei = imei; - } - - public String getImsi() - { - return imsi; - } - - public void setImsi(String imsi) - { - this.imsi = imsi; - } - - public int getCircleId() - { - return circleId; - } - - public void setCircleId(int circleId) - { - this.circleId = circleId; - } - - public String getCircleName() - { - return circleName; - } - - public void setCircleName(String circleName) - { - this.circleName = circleName; - } - - @Override public String toString() - { - return "POJOEnriched{" + - "phone='" + phone + '\'' + - ", imei='" + imei + '\'' + - ", imsi='" + imsi + '\'' + - ", circleId=" + circleId + - ", circleName='" + circleName + '\'' + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/DataGenerator.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/DataGenerator.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/DataGenerator.java new file mode 100644 index 0000000..2ba5567 --- /dev/null +++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/DataGenerator.java @@ -0,0 +1,94 @@ +package org.apache.apex.examples.enricher; + +import java.util.Random; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +/** + * Generates Subscriber Data: + * A Party Phone + * A Party IMEI + * A Party IMSI + * Circle Id + */ +public class DataGenerator extends BaseOperator implements InputOperator +{ + public static int NUM_CIRCLES = 10; + + private Random r; + private int count = 0; + private int limit = 1000; + + public final transient DefaultOutputPort<byte[]> output = new DefaultOutputPort<>(); + + @Override + public void setup(OperatorContext context) + { + r = new Random(System.currentTimeMillis()); + } + + @Override + public void beginWindow(long windowId) { + super.beginWindow(windowId); + count = 0; + } + + @Override + public void emitTuples() + { + if(count++ < limit) { + output.emit(getRecord()); + } + } + + private byte[] getRecord() + { + String phone = getRandomNumber(10); + String imsi = getHashInRange(phone, 15); + String imei = getHashInRange(imsi, 15); + String circleId = Math.abs(phone.hashCode()) % NUM_CIRCLES + ""; +// String record = MessageFormat.format(baseDataTemplate, phone, imsi, imei, circleId); + String record = "{" + + "\"phone\":\"" + phone + "\"," + + "\"imei\":\"" + imei+ "\"," + + "\"imsi\":\"" + imsi+ "\"," + + "\"circleId\":" + circleId + + "}"; + return record.getBytes(); + } + + private String getRandomNumber(int numDigits) + { + String retVal = (r.nextInt((9 - 1) + 1) + 1) + ""; + + for (int i = 0; i < numDigits - 1; i++) { + retVal += (r.nextInt((9 - 0) + 1) + 0); + } + return retVal; + } + + private String getHashInRange(String s, int n) + { + StringBuilder retVal = new StringBuilder(); + for (int i = 0, j = 0; i < n && j < s.length(); i++, j++) { + retVal.append(Math.abs(s.charAt(j) + "".hashCode()) % 10); + if (j == s.length() - 1) { + j = -1; + } + } + return retVal.toString(); + } + + public int getLimit() + { + return limit; + } + + public void setLimit(int limit) + { + this.limit = limit; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/EnricherAppWithJSONFile.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/EnricherAppWithJSONFile.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/EnricherAppWithJSONFile.java new file mode 100644 index 0000000..1a420c4 --- /dev/null +++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/EnricherAppWithJSONFile.java @@ -0,0 +1,47 @@ +package org.apache.apex.examples.enricher; + +import java.util.ArrayList; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.enrich.JsonFSLoader; +import com.datatorrent.contrib.enrich.POJOEnricher; +import com.datatorrent.contrib.parser.JsonParser; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +@ApplicationAnnotation(name="EnricherAppWithJSONFile") +public class EnricherAppWithJSONFile implements StreamingApplication +{ + @Override + public void populateDAG(DAG dag, Configuration conf) + { + DataGenerator dataGenerator = dag.addOperator("DataGenerator", DataGenerator.class); + JsonParser parser = dag.addOperator("Parser", JsonParser.class); + + /** + * FSLoader is used to configure Enricher backend. Property of FSLoader file which is fileName is set in + * properties.xml file. + * The format that is used to read the file is present as an example in resources/circleMapping.txt file. + */ + JsonFSLoader fsLoader = new JsonFSLoader(); + POJOEnricher enrich = dag.addOperator("Enrich", POJOEnricher.class); + enrich.setStore(fsLoader); + + ArrayList includeFields = new ArrayList(); + includeFields.add("circleName"); + ArrayList lookupFields = new ArrayList(); + lookupFields.add("circleId"); + + enrich.setIncludeFields(includeFields); + enrich.setLookupFields(lookupFields); + + ConsoleOutputOperator console = dag.addOperator("Console", ConsoleOutputOperator.class); + + dag.addStream("Parse", dataGenerator.output, parser.in); + dag.addStream("Enrich", parser.out, enrich.input); + dag.addStream("Console", enrich.output, console.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/LineOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/LineOutputOperator.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/LineOutputOperator.java new file mode 100644 index 0000000..aca7df6 --- /dev/null +++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/LineOutputOperator.java @@ -0,0 +1,34 @@ +package org.apache.apex.examples.enricher; + +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * Converts each tuple to a string and writes it as a new line to the output file + */ +public class LineOutputOperator extends AbstractFileOutputOperator<Object> +{ + private static final String NL = System.lineSeparator(); + private static final Charset CS = StandardCharsets.UTF_8; + + @NotNull + private String baseName; + + @Override + public byte[] getBytesForTuple(Object t) { + String result = new String(t.toString().getBytes(), CS) + NL; + return result.getBytes(CS); + } + + @Override + protected String getFileName(Object tuple) { + return baseName; + } + + public String getBaseName() { return baseName; } + public void setBaseName(String v) { baseName = v; } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJO.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJO.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJO.java new file mode 100644 index 0000000..d48bd1a --- /dev/null +++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJO.java @@ -0,0 +1,49 @@ +package org.apache.apex.examples.enricher; + +public class POJO +{ + private String phone; + private String imei; + private String imsi; + private int circleId; + + public String getPhone() + { + return phone; + } + + public void setPhone(String phone) + { + this.phone = phone; + } + + public String getImei() + { + return imei; + } + + public void setImei(String imei) + { + this.imei = imei; + } + + public String getImsi() + { + return imsi; + } + + public void setImsi(String imsi) + { + this.imsi = imsi; + } + + public int getCircleId() + { + return circleId; + } + + public void setCircleId(int circleId) + { + this.circleId = circleId; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJOEnriched.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJOEnriched.java b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJOEnriched.java new file mode 100644 index 0000000..df08c7f --- /dev/null +++ b/examples/enricher/src/main/java/org/apache/apex/examples/enricher/POJOEnriched.java @@ -0,0 +1,71 @@ +package org.apache.apex.examples.enricher; + +public class POJOEnriched +{ + private String phone; + private String imei; + private String imsi; + private int circleId; + private String circleName; + + public String getPhone() + { + return phone; + } + + public void setPhone(String phone) + { + this.phone = phone; + } + + public String getImei() + { + return imei; + } + + public void setImei(String imei) + { + this.imei = imei; + } + + public String getImsi() + { + return imsi; + } + + public void setImsi(String imsi) + { + this.imsi = imsi; + } + + public int getCircleId() + { + return circleId; + } + + public void setCircleId(int circleId) + { + this.circleId = circleId; + } + + public String getCircleName() + { + return circleName; + } + + public void setCircleName(String circleName) + { + this.circleName = circleName; + } + + @Override public String toString() + { + return "POJOEnriched{" + + "phone='" + phone + '\'' + + ", imei='" + imei + '\'' + + ", imsi='" + imsi + '\'' + + ", circleId=" + circleId + + ", circleName='" + circleName + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/enricher/src/main/resources/META-INF/properties.xml b/examples/enricher/src/main/resources/META-INF/properties.xml index 9ecf899..543b99a 100644 --- a/examples/enricher/src/main/resources/META-INF/properties.xml +++ b/examples/enricher/src/main/resources/META-INF/properties.xml @@ -3,17 +3,17 @@ <!-- Parser --> <property> <name>dt.application.EnricherAppWithJSONFile.operator.Parser.port.out.attr.TUPLE_CLASS</name> - <value>com.example.myapexapp.POJO</value> + <value>org.apache.apex.examples.enricher.POJO</value> </property> <!-- Enrich --> <property> <name>dt.application.EnricherAppWithJSONFile.operator.Enrich.port.input.attr.TUPLE_CLASS</name> - <value>com.example.myapexapp.POJO</value> + <value>org.apache.apex.examples.enricher.POJO</value> </property> <property> <name>dt.application.EnricherAppWithJSONFile.operator.Enrich.port.output.attr.TUPLE_CLASS</name> - <value>com.example.myapexapp.POJOEnriched</value> + <value>org.apache.apex.examples.enricher.POJOEnriched</value> </property> <property> <name>dt.application.EnricherAppWithJSONFile.operator.Enrich.prop.store.fileName</name> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/test/java/com/example/myapexapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/enricher/src/test/java/com/example/myapexapp/ApplicationTest.java deleted file mode 100644 index 4b04603..0000000 --- a/examples/enricher/src/test/java/com/example/myapexapp/ApplicationTest.java +++ /dev/null @@ -1,31 +0,0 @@ -package com.example.myapexapp; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -import com.datatorrent.api.LocalMode; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest { - - @Test - public void testApplication() throws Exception { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new EnricherAppWithJSONFile(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); // runs for 10 seconds and quits - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/enricher/src/test/java/org/apache/apex/examples/enricher/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/enricher/src/test/java/org/apache/apex/examples/enricher/ApplicationTest.java b/examples/enricher/src/test/java/org/apache/apex/examples/enricher/ApplicationTest.java new file mode 100644 index 0000000..6b6698e --- /dev/null +++ b/examples/enricher/src/test/java/org/apache/apex/examples/enricher/ApplicationTest.java @@ -0,0 +1,31 @@ +package org.apache.apex.examples.enricher; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest { + + @Test + public void testApplication() throws Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new EnricherAppWithJSONFile(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); // runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/filter/pom.xml b/examples/filter/pom.xml index 9407818..7ef038e 100644 --- a/examples/filter/pom.xml +++ b/examples/filter/pom.xml @@ -1,266 +1,26 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - - <groupId>com.datatorrent.tutorial</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>filter</artifactId> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples-filter</artifactId> <packaging>jar</packaging> <!-- change these to the appropriate values --> <name>Filter Operator</name> <description>Apex application demonstrating filter operator</description> - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - <malhar.version>3.6.0</malhar.version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - <dependencies> - <!-- add your dependencies here --> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>3.1</version> - <type>jar</type> - </dependency> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-contrib</artifactId> - <version>${malhar.version}</version> - <!-- + <version>${project.version}</version> + <!-- If you know that your application does not need transitive dependencies pulled in by malhar-library, uncomment the following to reduce the size of your app package. --> @@ -277,23 +37,10 @@ <version>2.4.0</version> <optional>true</optional> </dependency> - - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> <dependency> <groupId>org.apache.apex</groupId> <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> + <version>${apex.core.version}</version> <scope>test</scope> </dependency> <dependency> @@ -308,5 +55,4 @@ <version>2.7.8</version> </dependency> </dependencies> - </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/java/com/datatorrent/tutorial/filter/Application.java ---------------------------------------------------------------------- diff --git a/examples/filter/src/main/java/com/datatorrent/tutorial/filter/Application.java b/examples/filter/src/main/java/com/datatorrent/tutorial/filter/Application.java deleted file mode 100644 index 4ebb153..0000000 --- a/examples/filter/src/main/java/com/datatorrent/tutorial/filter/Application.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Copyright (c) 2016 DataTorrent, Inc. - * All rights reserved. - */ - -package com.datatorrent.tutorial.filter; - -import org.apache.apex.malhar.lib.fs.FSRecordReaderModule; -import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.formatter.CsvFormatter; -import com.datatorrent.contrib.parser.CsvParser; -import com.datatorrent.lib.filter.FilterOperator; - -/** - * Simple application illustrating filter operator - */ -@ApplicationAnnotation(name="FilterExample") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - - FSRecordReaderModule recordReader = dag.addModule("recordReader", FSRecordReaderModule.class); - CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class); - FilterOperator filterOperator = dag.addOperator("filterOperator", new FilterOperator()); - - CsvFormatter selectedFormatter = dag.addOperator("selectedFormatter", new CsvFormatter()); - CsvFormatter rejectedFormatter = dag.addOperator("rejectedFormatter", new CsvFormatter()); - - StringFileOutputOperator selectedOutput = dag.addOperator("selectedOutput", new StringFileOutputOperator()); - StringFileOutputOperator rejectedOutput = dag.addOperator("rejectedOutput", new StringFileOutputOperator()); - - dag.addStream("record", recordReader.records, csvParser.in); - dag.addStream("pojo", csvParser.out, filterOperator.input); - - dag.addStream("pojoSelected", filterOperator.truePort, selectedFormatter.in); - dag.addStream("pojoRejected", filterOperator.falsePort, rejectedFormatter.in); - - dag.addStream("csvSelected", selectedFormatter.out, selectedOutput.input); - dag.addStream("csvRejected", rejectedFormatter.out, rejectedOutput.input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/java/com/datatorrent/tutorial/filter/TransactionPOJO.java ---------------------------------------------------------------------- diff --git a/examples/filter/src/main/java/com/datatorrent/tutorial/filter/TransactionPOJO.java b/examples/filter/src/main/java/com/datatorrent/tutorial/filter/TransactionPOJO.java deleted file mode 100644 index c0d8817..0000000 --- a/examples/filter/src/main/java/com/datatorrent/tutorial/filter/TransactionPOJO.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright (c) 2016 DataTorrent, Inc. - * All rights reserved. - */ - -package com.datatorrent.tutorial.filter; - -public class TransactionPOJO -{ - - private long trasactionId; - private double amount; - private long accountNumber; - private String type; - - public long getTrasactionId() - { - return trasactionId; - } - - public void setTrasactionId(long trasactionId) - { - this.trasactionId = trasactionId; - } - - public double getAmount() - { - return amount; - } - - public void setAmount(double amount) - { - this.amount = amount; - } - - public long getAccountNumber() - { - return accountNumber; - } - - public void setAccountNumber(long accountNumber) - { - this.accountNumber = accountNumber; - } - - public String getType() - { - return type; - } - - public void setType(String type) - { - this.type = type; - } - - @Override - public String toString() - { - return "TransactionPOJO [trasactionId=" + trasactionId + ", amount=" + amount + ", accountNumber=" + accountNumber - + ", type=" + type + "]"; - } - - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/java/org/apache/apex/examples/filter/Application.java ---------------------------------------------------------------------- diff --git a/examples/filter/src/main/java/org/apache/apex/examples/filter/Application.java b/examples/filter/src/main/java/org/apache/apex/examples/filter/Application.java new file mode 100644 index 0000000..5bbf159 --- /dev/null +++ b/examples/filter/src/main/java/org/apache/apex/examples/filter/Application.java @@ -0,0 +1,49 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. + * All rights reserved. + */ + +package org.apache.apex.examples.filter; + +import org.apache.apex.malhar.lib.fs.FSRecordReaderModule; +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.formatter.CsvFormatter; +import com.datatorrent.contrib.parser.CsvParser; +import com.datatorrent.lib.filter.FilterOperator; + +/** + * Simple application illustrating filter operator + */ +@ApplicationAnnotation(name="FilterExample") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + + FSRecordReaderModule recordReader = dag.addModule("recordReader", FSRecordReaderModule.class); + CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class); + FilterOperator filterOperator = dag.addOperator("filterOperator", new FilterOperator()); + + CsvFormatter selectedFormatter = dag.addOperator("selectedFormatter", new CsvFormatter()); + CsvFormatter rejectedFormatter = dag.addOperator("rejectedFormatter", new CsvFormatter()); + + StringFileOutputOperator selectedOutput = dag.addOperator("selectedOutput", new StringFileOutputOperator()); + StringFileOutputOperator rejectedOutput = dag.addOperator("rejectedOutput", new StringFileOutputOperator()); + + dag.addStream("record", recordReader.records, csvParser.in); + dag.addStream("pojo", csvParser.out, filterOperator.input); + + dag.addStream("pojoSelected", filterOperator.truePort, selectedFormatter.in); + dag.addStream("pojoRejected", filterOperator.falsePort, rejectedFormatter.in); + + dag.addStream("csvSelected", selectedFormatter.out, selectedOutput.input); + dag.addStream("csvRejected", rejectedFormatter.out, rejectedOutput.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/java/org/apache/apex/examples/filter/TransactionPOJO.java ---------------------------------------------------------------------- diff --git a/examples/filter/src/main/java/org/apache/apex/examples/filter/TransactionPOJO.java b/examples/filter/src/main/java/org/apache/apex/examples/filter/TransactionPOJO.java new file mode 100644 index 0000000..4b0d7a9 --- /dev/null +++ b/examples/filter/src/main/java/org/apache/apex/examples/filter/TransactionPOJO.java @@ -0,0 +1,62 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. + * All rights reserved. + */ + +package org.apache.apex.examples.filter; + +public class TransactionPOJO +{ + + private long trasactionId; + private double amount; + private long accountNumber; + private String type; + + public long getTrasactionId() + { + return trasactionId; + } + + public void setTrasactionId(long trasactionId) + { + this.trasactionId = trasactionId; + } + + public double getAmount() + { + return amount; + } + + public void setAmount(double amount) + { + this.amount = amount; + } + + public long getAccountNumber() + { + return accountNumber; + } + + public void setAccountNumber(long accountNumber) + { + this.accountNumber = accountNumber; + } + + public String getType() + { + return type; + } + + public void setType(String type) + { + this.type = type; + } + + @Override + public String toString() + { + return "TransactionPOJO [trasactionId=" + trasactionId + ", amount=" + amount + ", accountNumber=" + accountNumber + + ", type=" + type + "]"; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/filter/src/main/resources/META-INF/properties.xml b/examples/filter/src/main/resources/META-INF/properties.xml index 079fb3f..8c1a4a3 100644 --- a/examples/filter/src/main/resources/META-INF/properties.xml +++ b/examples/filter/src/main/resources/META-INF/properties.xml @@ -33,7 +33,7 @@ </property> <property> <name>dt.application.FilterExample.operator.csvParser.port.out.attr.TUPLE_CLASS</name> - <value>com.datatorrent.tutorial.filter.TransactionPOJO</value> + <value>org.apache.apex.examples.filter.TransactionPOJO</value> </property> <property> <name>dt.application.FilterExample.operator.selectedOutput.prop.maxIdleWindows</name> @@ -45,15 +45,15 @@ </property> <property> <name>dt.application.FilterExample.operator.filterOperator.port.input.attr.TUPLE_CLASS</name> - <value>com.datatorrent.tutorial.filter.TransactionPOJO</value> + <value>org.apache.apex.examples.filter.TransactionPOJO</value> </property> <property> <name>dt.application.FilterExample.operator.selectedFormatter.port.in.attr.TUPLE_CLASS</name> - <value>com.datatorrent.tutorial.filter.TransactionPOJO</value> + <value>org.apache.apex.examples.filter.TransactionPOJO</value> </property> <property> <name>dt.application.FilterExample.operator.rejectedFormatter.port.in.attr.TUPLE_CLASS</name> - <value>com.datatorrent.tutorial.filter.TransactionPOJO</value> + <value>org.apache.apex.examples.filter.TransactionPOJO</value> </property> <property> <name>dt.application.FilterExample.operator.filterOperator.prop.condition</name> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/test/java/com/datatorrent/tutorial/filter/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/filter/src/test/java/com/datatorrent/tutorial/filter/ApplicationTest.java b/examples/filter/src/test/java/com/datatorrent/tutorial/filter/ApplicationTest.java deleted file mode 100644 index a90e822..0000000 --- a/examples/filter/src/test/java/com/datatorrent/tutorial/filter/ApplicationTest.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Copyright (c) 2016 DataTorrent, Inc. - * All rights reserved. - */ - -package com.datatorrent.tutorial.filter; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.Callable; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.stram.StramLocalCluster; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest -{ - private String outputDir; - - public static class TestMeta extends TestWatcher - { - public String baseDirectory; - - @Override - protected void starting(org.junit.runner.Description description) - { - this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); - } - - @Override - protected void finished(Description description) - { - super.finished(description); - try { - FileUtils.forceDelete(new File(baseDirectory)); - } catch (IOException e) { - e.printStackTrace(); - } - } - - } - - @Rule - public TestMeta testMeta = new TestMeta(); - - @Before - public void setup() throws Exception - { - outputDir = testMeta.baseDirectory + File.separator + "output"; - } - - @Test - public void testApplication() throws IOException, Exception - { - - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - conf.set("dt.application.FilterExample.operator.selectedOutput.prop.filePath", outputDir); - conf.set("dt.application.FilterExample.operator.rejectedOutput.prop.filePath", outputDir); - final File selectedfile = FileUtils.getFile(outputDir, "selected.txt_8.0"); - final File rejectedfile = FileUtils.getFile(outputDir, "rejected.txt_6.0"); - - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - - ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() - { - @Override - public Boolean call() throws Exception - { - if (selectedfile.exists() && rejectedfile.exists()) { - return true; - } - return false; - } - }); - - lc.run(40000); - Assert.assertTrue( - FileUtils.contentEquals( - FileUtils.getFile( - "src/main/resources/META-INF/selected_output.txt" - ),selectedfile)); - - Assert.assertTrue( - FileUtils.contentEquals( - FileUtils.getFile( - "src/main/resources/META-INF/rejected_output.txt" - ),rejectedfile)); - - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/filter/src/test/java/org/apache/apex/examples/filter/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/filter/src/test/java/org/apache/apex/examples/filter/ApplicationTest.java b/examples/filter/src/test/java/org/apache/apex/examples/filter/ApplicationTest.java new file mode 100644 index 0000000..2170753 --- /dev/null +++ b/examples/filter/src/test/java/org/apache/apex/examples/filter/ApplicationTest.java @@ -0,0 +1,96 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. + * All rights reserved. + */ + +package org.apache.apex.examples.filter; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.Callable; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; +import com.datatorrent.stram.StramLocalCluster; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest +{ + private String outputDir; + + public static class TestMeta extends TestWatcher + { + public String baseDirectory; + + @Override + protected void starting(org.junit.runner.Description description) + { + this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + try { + FileUtils.forceDelete(new File(baseDirectory)); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Before + public void setup() throws Exception + { + outputDir = testMeta.baseDirectory + File.separator + "output"; + } + + @Test + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.set("dt.application.FilterExample.operator.selectedOutput.prop.filePath", outputDir); + conf.set("dt.application.FilterExample.operator.rejectedOutput.prop.filePath", outputDir); + final File selectedfile = FileUtils.getFile(outputDir, "selected.txt_8.0"); + final File rejectedfile = FileUtils.getFile(outputDir, "rejected.txt_6.0"); + + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + + ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() + { + @Override + public Boolean call() throws Exception + { + if (selectedfile.exists() && rejectedfile.exists()) { + return true; + } + return false; + } + }); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/innerjoin/pom.xml ---------------------------------------------------------------------- diff --git a/examples/innerjoin/pom.xml b/examples/innerjoin/pom.xml index 29c1b90..78193ca 100644 --- a/examples/innerjoin/pom.xml +++ b/examples/innerjoin/pom.xml @@ -2,266 +2,49 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>innerjoin</artifactId> + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples-innerjoin</artifactId> <packaging>jar</packaging> <name>Inner Join Application</name> <description>Sample Application for Inner Join</description> - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <malhar.version>3.6.0</malhar.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - <dependencies> - <!-- add your dependencies here --> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> + <version>${project.version}</version> + <!-- + If you know that your application does not need transitive dependencies pulled in by malhar-library, + uncomment the following to reduce the size of your app package. + --> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <version>2.14.1</version> </dependency> <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.9.1</version> </dependency> <dependency> <groupId>org.codehaus.janino</groupId> - <artifactId>janino</artifactId> + <artifactId>commons-compiler</artifactId> <version>2.7.8</version> + <type>jar</type> </dependency> </dependencies> - </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/innerjoin/src/main/java/com/example/join/InnerJoinApplication.java ---------------------------------------------------------------------- diff --git a/examples/innerjoin/src/main/java/com/example/join/InnerJoinApplication.java b/examples/innerjoin/src/main/java/com/example/join/InnerJoinApplication.java deleted file mode 100644 index 0b2f663..0000000 --- a/examples/innerjoin/src/main/java/com/example/join/InnerJoinApplication.java +++ /dev/null @@ -1,39 +0,0 @@ -package com.example.join; - -import org.apache.apex.malhar.lib.join.POJOInnerJoinOperator; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.partitioner.StatelessPartitioner; -import com.datatorrent.lib.io.ConsoleOutputOperator; - -@ApplicationAnnotation(name="InnerJoinExample") -public class InnerJoinApplication implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // SalesEvent Generator - POJOGenerator salesGenerator = dag.addOperator("Input1", new POJOGenerator()); - // ProductEvent Generator - POJOGenerator productGenerator = dag.addOperator("Input2", new POJOGenerator()); - productGenerator.setSalesEvent(false); - - // Inner join Operator - POJOInnerJoinOperator join = dag.addOperator("Join", new POJOInnerJoinOperator()); - ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); - - // Streams - dag.addStream("SalesToJoin", salesGenerator.output, join.input1); - dag.addStream("ProductToJoin", productGenerator.output, join.input2); - dag.addStream("JoinToConsole", join.outputPort, output.input); - - // Setting tuple class properties to the ports of join operator - dag.setInputPortAttribute(join.input1, Context.PortContext.TUPLE_CLASS, POJOGenerator.SalesEvent.class); - dag.setInputPortAttribute(join.input2, Context.PortContext.TUPLE_CLASS, POJOGenerator.ProductEvent.class); - dag.setOutputPortAttribute(join.outputPort,Context.PortContext.TUPLE_CLASS, POJOGenerator.SalesEvent.class); - } -}
