APEXMALHAR-2233 Updated the examples to follow the structure of apex-malhar examples. Specified dependencies in pom.xmls of individual examples correctly.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9c154f20 Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9c154f20 Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9c154f20 Branch: refs/heads/master Commit: 9c154f204042a9e1974c2466e8783505b2c6da03 Parents: 8e20097 Author: Lakshmi Prasanna Velineni <[email protected]> Authored: Sun Mar 19 22:40:04 2017 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Sun Mar 26 11:43:48 2017 -0700 ---------------------------------------------------------------------- examples/csvformatter/pom.xml | 299 +++---------------- .../java/com/demo/myapexapp/Application.java | 45 --- .../com/demo/myapexapp/HDFSOutputOperator.java | 87 ------ .../java/com/demo/myapexapp/JsonGenerator.java | 78 ----- .../main/java/com/demo/myapexapp/PojoEvent.java | 141 --------- .../apex/examples/csvformatter/Application.java | 37 +++ .../csvformatter/HDFSOutputOperator.java | 87 ++++++ .../examples/csvformatter/JsonGenerator.java | 76 +++++ .../apex/examples/csvformatter/PojoEvent.java | 141 +++++++++ .../src/main/resources/META-INF/properties.xml | 2 +- .../com/demo/myapexapp/ApplicationTest.java | 67 ----- .../examples/csvformatter/ApplicationTest.java | 65 ++++ examples/dedup/pom.xml | 277 +---------------- .../java/com/example/dedup/Application.java | 123 -------- .../apache/apex/examples/dedup/Application.java | 122 ++++++++ .../java/com/example/dedup/ApplicationTest.java | 38 --- .../apex/examples/dedup/ApplicationTest.java | 37 +++ examples/dynamic-partition/pom.xml | 270 +---------------- .../src/main/java/com/example/dynamic/App.java | 23 -- .../src/main/java/com/example/dynamic/Gen.java | 169 ----------- .../org/apache/apex/examples/dynamic/App.java | 23 ++ .../org/apache/apex/examples/dynamic/Gen.java | 171 +++++++++++ .../com/example/dynamic/ApplicationTest.java | 34 --- .../apex/examples/dynamic/ApplicationTest.java | 33 ++ examples/enricher/pom.xml | 298 ++---------------- .../com/example/myapexapp/DataGenerator.java | 94 ------ .../myapexapp/EnricherAppWithJSONFile.java | 47 --- .../example/myapexapp/LineOutputOperator.java | 34 --- .../main/java/com/example/myapexapp/POJO.java | 49 --- .../com/example/myapexapp/POJOEnriched.java | 71 ----- .../apex/examples/enricher/DataGenerator.java | 94 ++++++ .../enricher/EnricherAppWithJSONFile.java | 47 +++ .../examples/enricher/LineOutputOperator.java | 34 +++ .../org/apache/apex/examples/enricher/POJO.java | 49 +++ .../apex/examples/enricher/POJOEnriched.java | 71 +++++ .../src/main/resources/META-INF/properties.xml | 6 +- .../com/example/myapexapp/ApplicationTest.java | 31 -- .../apex/examples/enricher/ApplicationTest.java | 31 ++ examples/filter/pom.xml | 276 +---------------- .../tutorial/filter/Application.java | 49 --- .../tutorial/filter/TransactionPOJO.java | 64 ---- .../apex/examples/filter/Application.java | 49 +++ .../apex/examples/filter/TransactionPOJO.java | 62 ++++ .../src/main/resources/META-INF/properties.xml | 8 +- .../tutorial/filter/ApplicationTest.java | 111 ------- .../apex/examples/filter/ApplicationTest.java | 96 ++++++ examples/innerjoin/pom.xml | 269 ++--------------- .../com/example/join/InnerJoinApplication.java | 39 --- .../java/com/example/join/POJOGenerator.java | 260 ---------------- .../innerjoin/InnerJoinApplication.java | 38 +++ .../apex/examples/innerjoin/POJOGenerator.java | 260 ++++++++++++++++ .../example/join/InnerJoinApplicationTest.java | 21 -- .../innerjoin/InnerJoinApplicationTest.java | 21 ++ examples/parser/pom.xml | 268 ++--------------- .../tutorial/jsonparser/Application.java | 35 --- .../tutorial/jsonparser/Campaign.java | 74 ----- .../tutorial/jsonparser/JsonGenerator.java | 83 ----- .../examples/parser/jsonparser/Application.java | 35 +++ .../examples/parser/jsonparser/Campaign.java | 74 +++++ .../parser/jsonparser/JsonGenerator.java | 83 +++++ .../src/main/resources/META-INF/properties.xml | 4 +- .../tutorial/jsonparser/ApplicationTest.java | 36 --- .../parser/jsonparser/ApplicationTest.java | 35 +++ examples/partition/pom.xml | 276 +---------------- .../java/com/example/myapexapp/Application.java | 27 -- .../main/java/com/example/myapexapp/Codec3.java | 13 - .../myapexapp/RandomNumberGenerator.java | 83 ----- .../com/example/myapexapp/TestPartition.java | 164 ---------- .../apex/examples/partition/Application.java | 25 ++ .../apache/apex/examples/partition/Codec3.java | 13 + .../partition/RandomNumberGenerator.java | 76 +++++ .../apex/examples/partition/TestPartition.java | 149 +++++++++ .../src/main/resources/my-log4j.properties | 2 +- .../com/example/myapexapp/ApplicationTest.java | 37 --- .../examples/partition/ApplicationTest.java | 36 +++ examples/pom.xml | 10 + examples/recordReader/pom.xml | 284 +----------------- .../com/example/recordReader/Application.java | 32 -- .../recordReader/TransactionsSchema.java | 168 ----------- .../apex/examples/recordReader/Application.java | 32 ++ .../recordReader/TransactionsSchema.java | 168 +++++++++++ .../src/main/resources/META-INF/properties.xml | 4 +- .../example/recordReader/ApplicationTest.java | 91 ------ .../examples/recordReader/ApplicationTest.java | 91 ++++++ examples/throttle/pom.xml | 256 +--------------- .../examples/throttle/Application.java | 51 ---- .../examples/throttle/PassThroughOperator.java | 20 -- .../throttle/RandomNumberGenerator.java | 64 ---- .../examples/throttle/SlowDevNullOperator.java | 35 --- .../throttle/ThrottlingStatsListener.java | 150 ---------- .../apex/examples/throttle/Application.java | 51 ++++ .../examples/throttle/PassThroughOperator.java | 20 ++ .../throttle/RandomNumberGenerator.java | 64 ++++ .../examples/throttle/SlowDevNullOperator.java | 35 +++ .../throttle/ThrottlingStatsListener.java | 150 ++++++++++ .../examples/throttle/ApplicationTest.java | 37 --- .../apex/examples/throttle/ApplicationTest.java | 36 +++ examples/transform/pom.xml | 250 +--------------- .../java/com/example/transform/Application.java | 39 --- .../com/example/transform/CustomerEvent.java | 74 ----- .../com/example/transform/CustomerInfo.java | 60 ---- .../transform/DynamicTransformApplication.java | 52 ---- .../com/example/transform/POJOGenerator.java | 125 -------- .../apex/examples/transform/Application.java | 39 +++ .../apex/examples/transform/CustomerEvent.java | 74 +++++ .../apex/examples/transform/CustomerInfo.java | 60 ++++ .../transform/DynamicTransformApplication.java | 51 ++++ .../apex/examples/transform/POJOGenerator.java | 125 ++++++++ .../com/example/transform/ApplicationTest.java | 21 -- .../examples/transform/ApplicationTest.java | 21 ++ 110 files changed, 3404 insertions(+), 6088 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/pom.xml ---------------------------------------------------------------------- diff --git a/examples/csvformatter/pom.xml b/examples/csvformatter/pom.xml index 9033db5..be3be7a 100644 --- a/examples/csvformatter/pom.xml +++ b/examples/csvformatter/pom.xml @@ -1,266 +1,61 @@ <?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>formatter</artifactId> + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples-csvformatter</artifactId> <packaging>jar</packaging> <!-- change these to the appropriate values --> <name>Formatter Apps</name> <description>Applications to showcase different formatters</description> - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - <malhar.version>3.6.0</malhar.version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - <dependencies> - <!-- add your dependencies here --> <dependency> <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> - <!-- + <artifactId>malhar-contrib</artifactId> + <version>${project.version}</version> <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> </exclusions> - --> </dependency> <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-contrib</artifactId> - <version>${malhar.version}</version> + <groupId>org.codehaus.jettison</groupId> + <artifactId>jettison</artifactId> + <version>1.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> - <version>2.5.4</version> + <version>2.7.0</version> </dependency> <dependency> <groupId>com.github.fge</groupId> @@ -269,29 +64,9 @@ <optional>true</optional> </dependency> <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> - </dependency> - - <dependency> <groupId>net.sf.supercsv</groupId> <artifactId>super-csv</artifactId> <version>2.4.0</version> </dependency> </dependencies> - </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java deleted file mode 100644 index a4ff06f..0000000 --- a/examples/csvformatter/src/main/java/com/demo/myapexapp/Application.java +++ /dev/null @@ -1,45 +0,0 @@ -package com.demo.myapexapp; - -import java.util.Arrays; - -import com.datatorrent.contrib.parser.JsonParser; - -import org.apache.apex.malhar.contrib.parser.StreamingJsonParser; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.Context.PortContext; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StatsListener; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.formatter.CsvFormatter; -import com.datatorrent.lib.appdata.schemas.SchemaUtils; -import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner; -import com.datatorrent.lib.partitioner.StatelessThroughputBasedPartitioner; - -@ApplicationAnnotation(name = "CustomOutputFormatter") -public class Application implements StreamingApplication -{ - //Set the delimiters and schema structure for the custom output in schema.json - private static final String filename = "schema.json"; - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - JsonGenerator generator = dag.addOperator("JsonGenerator", JsonGenerator.class); - JsonParser jsonParser = dag.addOperator("jsonParser", JsonParser.class); - - CsvFormatter formatter = dag.addOperator("formatter", CsvFormatter.class); - formatter.setSchema(SchemaUtils.jarResourceFileToString(filename)); - dag.setInputPortAttribute(formatter.in, PortContext.TUPLE_CLASS, PojoEvent.class); - - HDFSOutputOperator<String> hdfsOutput = dag.addOperator("HDFSOutputOperator", HDFSOutputOperator.class); - hdfsOutput.setLineDelimiter(""); - - dag.addStream("parserStream", generator.out, jsonParser.in); - dag.addStream("formatterStream", jsonParser.out, formatter.in); - dag.addStream("outputStream", formatter.out, hdfsOutput.input); - - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java deleted file mode 100644 index 5cb162c..0000000 --- a/examples/csvformatter/src/main/java/com/demo/myapexapp/HDFSOutputOperator.java +++ /dev/null @@ -1,87 +0,0 @@ -package com.demo.myapexapp; - -import javax.validation.constraints.NotNull; - -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; - -/** - * HDFSoutput operator with implementation to write Objects to HDFS - * - * @param <T> - */ -public class HDFSOutputOperator<T> extends AbstractFileOutputOperator<T> -{ - - @NotNull - String outFileName; - - //setting default value - String lineDelimiter = "\n"; - - //Switch to write the files to HDFS - set to false to diable writes - private boolean writeFilesFlag = true; - - int id; - - @Override - public void setup(OperatorContext context) - { - super.setup(context); - id = context.getId(); - } - - public boolean isWriteFilesFlag() - { - return writeFilesFlag; - } - - public void setWriteFilesFlag(boolean writeFilesFlag) - { - this.writeFilesFlag = writeFilesFlag; - } - - public String getOutFileName() - { - return outFileName; - } - - public void setOutFileName(String outFileName) - { - this.outFileName = outFileName; - } - - @Override - protected String getFileName(T tuple) - { - return getOutFileName() + id; - } - - public String getLineDelimiter() - { - return lineDelimiter; - } - - public void setLineDelimiter(String lineDelimiter) - { - this.lineDelimiter = lineDelimiter; - } - - @Override - protected byte[] getBytesForTuple(T tuple) - { - String temp = tuple.toString().concat(String.valueOf(lineDelimiter)); - byte[] theByteArray = temp.getBytes(); - - return theByteArray; - } - - @Override - protected void processTuple(T tuple) - { - if (writeFilesFlag) { - } - super.processTuple(tuple); - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java deleted file mode 100644 index f50f300..0000000 --- a/examples/csvformatter/src/main/java/com/demo/myapexapp/JsonGenerator.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.demo.myapexapp; - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Random; - -import javax.validation.constraints.Min; - -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; - -public class JsonGenerator extends BaseOperator implements InputOperator -{ - - private static final Logger LOG = LoggerFactory.getLogger(JsonGenerator.class); - - @Min(1) - private int numTuples = 20; - private transient int count = 0; - - public static Random rand = new Random(); - private int sleepTime=5; - - public final transient DefaultOutputPort<byte[]> out = new DefaultOutputPort<byte[]>(); - - private static String getJson() - { - - JSONObject obj = new JSONObject(); - try { - obj.put("campaignId", 1234); - obj.put("campaignName", "SimpleCsvFormatterExample"); - obj.put("campaignBudget", 10000.0); - obj.put("weatherTargeting", "false"); - obj.put("securityCode", "APEX"); - } catch (JSONException e) { - return null; - } - return obj.toString(); - } - - @Override - public void beginWindow(long windowId) - { - count = 0; - } - - @Override - public void emitTuples() - { - if (count++ < numTuples) { - out.emit(getJson().getBytes()); - } else { - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted"); - } - } - } - - public int getNumTuples() - { - return numTuples; - } - - public void setNumTuples(int numTuples) - { - this.numTuples = numTuples; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java b/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java deleted file mode 100644 index 8514856..0000000 --- a/examples/csvformatter/src/main/java/com/demo/myapexapp/PojoEvent.java +++ /dev/null @@ -1,141 +0,0 @@ -package com.demo.myapexapp; - -import java.util.Date; - -public class PojoEvent -{ - - private int advId; - private int campaignId; - private String campaignName; - private double campaignBudget; - private Date startDate; - private Date endDate; - private String securityCode; - private boolean weatherTargeting; - private boolean optimized; - private String parentCampaign; - private Character weatherTargeted; - private String valid; - - public int getAdvId() - { - return advId; - } - - public void setAdvId(int AdId) - { - this.advId = advId; - } - - public int getCampaignId() - { - return campaignId; - } - - public void setCampaignId(int campaignId) - { - this.campaignId = campaignId; - } - - public String getCampaignName() - { - return campaignName; - } - - public void setCampaignName(String campaignName) - { - this.campaignName = campaignName; - } - - public double getCampaignBudget() - { - return campaignBudget; - } - - public void setCampaignBudget(double campaignBudget) - { - this.campaignBudget = campaignBudget; - } - - public Date getStartDate() - { - return startDate; - } - - public void setStartDate(Date startDate) - { - this.startDate = startDate; - } - - public Date getEndDate() - { - return endDate; - } - - public void setEndDate(Date endDate) - { - this.endDate = endDate; - } - - public String getSecurityCode() - { - return securityCode; - } - - public void setSecurityCode(String securityCode) - { - this.securityCode = securityCode; - } - - public boolean isWeatherTargeting() - { - return weatherTargeting; - } - - public void setWeatherTargeting(boolean weatherTargeting) - { - this.weatherTargeting = weatherTargeting; - } - - public boolean isOptimized() - { - return optimized; - } - - public void setOptimized(boolean optimized) - { - this.optimized = optimized; - } - - public String getParentCampaign() - { - return parentCampaign; - } - - public void setParentCampaign(String parentCampaign) - { - this.parentCampaign = parentCampaign; - } - - public Character getWeatherTargeted() - { - return weatherTargeted; - } - - public void setWeatherTargeted(Character weatherTargeted) - { - this.weatherTargeted = weatherTargeted; - } - - public String getValid() - { - return valid; - } - - public void setValid(String valid) - { - this.valid = valid; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java new file mode 100644 index 0000000..cc9ee79 --- /dev/null +++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/Application.java @@ -0,0 +1,37 @@ +package org.apache.apex.examples.csvformatter; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.formatter.CsvFormatter; +import com.datatorrent.contrib.parser.JsonParser; +import com.datatorrent.lib.appdata.schemas.SchemaUtils; + +@ApplicationAnnotation(name = "CustomOutputFormatter") +public class Application implements StreamingApplication +{ + //Set the delimiters and schema structure for the custom output in schema.json + private static final String filename = "schema.json"; + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + JsonGenerator generator = dag.addOperator("JsonGenerator", JsonGenerator.class); + JsonParser jsonParser = dag.addOperator("jsonParser", JsonParser.class); + + CsvFormatter formatter = dag.addOperator("formatter", CsvFormatter.class); + formatter.setSchema(SchemaUtils.jarResourceFileToString(filename)); + dag.setInputPortAttribute(formatter.in, PortContext.TUPLE_CLASS, PojoEvent.class); + + HDFSOutputOperator<String> hdfsOutput = dag.addOperator("HDFSOutputOperator", HDFSOutputOperator.class); + hdfsOutput.setLineDelimiter(""); + + dag.addStream("parserStream", generator.out, jsonParser.in); + dag.addStream("formatterStream", jsonParser.out, formatter.in); + dag.addStream("outputStream", formatter.out, hdfsOutput.input); + + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java new file mode 100644 index 0000000..7cdd8bb --- /dev/null +++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/HDFSOutputOperator.java @@ -0,0 +1,87 @@ +package org.apache.apex.examples.csvformatter; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * HDFSoutput operator with implementation to write Objects to HDFS + * + * @param <T> + */ +public class HDFSOutputOperator<T> extends AbstractFileOutputOperator<T> +{ + + @NotNull + String outFileName; + + //setting default value + String lineDelimiter = "\n"; + + //Switch to write the files to HDFS - set to false to diable writes + private boolean writeFilesFlag = true; + + int id; + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + id = context.getId(); + } + + public boolean isWriteFilesFlag() + { + return writeFilesFlag; + } + + public void setWriteFilesFlag(boolean writeFilesFlag) + { + this.writeFilesFlag = writeFilesFlag; + } + + public String getOutFileName() + { + return outFileName; + } + + public void setOutFileName(String outFileName) + { + this.outFileName = outFileName; + } + + @Override + protected String getFileName(T tuple) + { + return getOutFileName() + id; + } + + public String getLineDelimiter() + { + return lineDelimiter; + } + + public void setLineDelimiter(String lineDelimiter) + { + this.lineDelimiter = lineDelimiter; + } + + @Override + protected byte[] getBytesForTuple(T tuple) + { + String temp = tuple.toString().concat(String.valueOf(lineDelimiter)); + byte[] theByteArray = temp.getBytes(); + + return theByteArray; + } + + @Override + protected void processTuple(T tuple) + { + if (writeFilesFlag) { + } + super.processTuple(tuple); + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java new file mode 100644 index 0000000..9b7698c --- /dev/null +++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/JsonGenerator.java @@ -0,0 +1,76 @@ +package org.apache.apex.examples.csvformatter; + +import java.util.Random; + +import javax.validation.constraints.Min; + +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +public class JsonGenerator extends BaseOperator implements InputOperator +{ + + private static final Logger LOG = LoggerFactory.getLogger(JsonGenerator.class); + + @Min(1) + private int numTuples = 20; + private transient int count = 0; + + public static Random rand = new Random(); + private int sleepTime=5; + + public final transient DefaultOutputPort<byte[]> out = new DefaultOutputPort<byte[]>(); + + private static String getJson() + { + + JSONObject obj = new JSONObject(); + try { + obj.put("campaignId", 1234); + obj.put("campaignName", "SimpleCsvFormatterExample"); + obj.put("campaignBudget", 10000.0); + obj.put("weatherTargeting", "false"); + obj.put("securityCode", "APEX"); + } catch (JSONException e) { + return null; + } + return obj.toString(); + } + + @Override + public void beginWindow(long windowId) + { + count = 0; + } + + @Override + public void emitTuples() + { + if (count++ < numTuples) { + out.emit(getJson().getBytes()); + } else { + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted"); + } + } + } + + public int getNumTuples() + { + return numTuples; + } + + public void setNumTuples(int numTuples) + { + this.numTuples = numTuples; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java new file mode 100644 index 0000000..03fda93 --- /dev/null +++ b/examples/csvformatter/src/main/java/org/apache/apex/examples/csvformatter/PojoEvent.java @@ -0,0 +1,141 @@ +package org.apache.apex.examples.csvformatter; + +import java.util.Date; + +public class PojoEvent +{ + + private int advId; + private int campaignId; + private String campaignName; + private double campaignBudget; + private Date startDate; + private Date endDate; + private String securityCode; + private boolean weatherTargeting; + private boolean optimized; + private String parentCampaign; + private Character weatherTargeted; + private String valid; + + public int getAdvId() + { + return advId; + } + + public void setAdvId(int AdId) + { + this.advId = advId; + } + + public int getCampaignId() + { + return campaignId; + } + + public void setCampaignId(int campaignId) + { + this.campaignId = campaignId; + } + + public String getCampaignName() + { + return campaignName; + } + + public void setCampaignName(String campaignName) + { + this.campaignName = campaignName; + } + + public double getCampaignBudget() + { + return campaignBudget; + } + + public void setCampaignBudget(double campaignBudget) + { + this.campaignBudget = campaignBudget; + } + + public Date getStartDate() + { + return startDate; + } + + public void setStartDate(Date startDate) + { + this.startDate = startDate; + } + + public Date getEndDate() + { + return endDate; + } + + public void setEndDate(Date endDate) + { + this.endDate = endDate; + } + + public String getSecurityCode() + { + return securityCode; + } + + public void setSecurityCode(String securityCode) + { + this.securityCode = securityCode; + } + + public boolean isWeatherTargeting() + { + return weatherTargeting; + } + + public void setWeatherTargeting(boolean weatherTargeting) + { + this.weatherTargeting = weatherTargeting; + } + + public boolean isOptimized() + { + return optimized; + } + + public void setOptimized(boolean optimized) + { + this.optimized = optimized; + } + + public String getParentCampaign() + { + return parentCampaign; + } + + public void setParentCampaign(String parentCampaign) + { + this.parentCampaign = parentCampaign; + } + + public Character getWeatherTargeted() + { + return weatherTargeted; + } + + public void setWeatherTargeted(Character weatherTargeted) + { + this.weatherTargeted = weatherTargeted; + } + + public String getValid() + { + return valid; + } + + public void setValid(String valid) + { + this.valid = valid; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/main/resources/META-INF/properties.xml b/examples/csvformatter/src/main/resources/META-INF/properties.xml index ed2b5ce..8d67c93 100644 --- a/examples/csvformatter/src/main/resources/META-INF/properties.xml +++ b/examples/csvformatter/src/main/resources/META-INF/properties.xml @@ -20,7 +20,7 @@ <property> <name>dt.application.CustomOutputFormatter.operator.jsonParser.port.out.attr.TUPLE_CLASS </name> - <value>com.demo.myapexapp.PojoEvent</value> + <value>org.apache.apex.examples.csvformatter.PojoEvent</value> </property> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java b/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java deleted file mode 100644 index efe5946..0000000 --- a/examples/csvformatter/src/test/java/com/demo/myapexapp/ApplicationTest.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.demo.myapexapp; - -import java.io.File; -import java.io.IOException; -import java.util.Collection; - -import javax.validation.ConstraintViolationException; - -import org.apache.commons.io.FileUtils; - -import org.junit.AfterClass; -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; - -import org.junit.Test; - -import com.datatorrent.api.LocalMode; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest -{ - - private static final String FILE_NAME = "/tmp/formatterApp"; - - @AfterClass - public static void cleanup() - { - try { - FileUtils.deleteDirectory(new File(FILE_NAME)); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Test - public void testApplication() throws Exception - { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - - // wait for output files to roll - Thread.sleep(5000); - - String[] extensions = {"dat.0", "tmp"}; - Collection<File> list = FileUtils.listFiles(new File(FILE_NAME), extensions, false); - - for (File file : list) { - for (String line : FileUtils.readLines(file)) { - Assert.assertEquals("Delimiter in record", true, (line.equals( - "1234|0|SimpleCsvFormatterExample|10000.0|||APEX|false|false||"))); - } - } - - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java b/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java new file mode 100644 index 0000000..67d5fd0 --- /dev/null +++ b/examples/csvformatter/src/test/java/org/apache/apex/examples/csvformatter/ApplicationTest.java @@ -0,0 +1,65 @@ +package org.apache.apex.examples.csvformatter; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import javax.validation.ConstraintViolationException; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest +{ + + private static final String FILE_NAME = "/tmp/formatterApp"; + + @AfterClass + public static void cleanup() + { + try { + FileUtils.deleteDirectory(new File(FILE_NAME)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to roll + Thread.sleep(5000); + + String[] extensions = {"dat.0", "tmp"}; + Collection<File> list = FileUtils.listFiles(new File(FILE_NAME), extensions, false); + + for (File file : list) { + for (String line : FileUtils.readLines(file)) { + Assert.assertEquals("Delimiter in record", true, (line.equals( + "1234|0|SimpleCsvFormatterExample|10000.0|||APEX|false|false||"))); + } + } + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/pom.xml ---------------------------------------------------------------------- diff --git a/examples/dedup/pom.xml b/examples/dedup/pom.xml index f777784..ba5a24d 100644 --- a/examples/dedup/pom.xml +++ b/examples/dedup/pom.xml @@ -2,279 +2,30 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>dedup</artifactId> + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples-dedup</artifactId> <packaging>jar</packaging> <!-- change these to the appropriate values --> - <name>My Apex Application</name> - <description>My Apex Application Description</description> - - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <malhar.version>3.6.0</malhar.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> + <name>Dedup Application</name> + <description>Dedup Application</description> <dependencies> - <!-- add your dependencies here --> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> - <!-- - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - --> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> - </dependency> <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>janino</artifactId> <version>2.7.8</version> </dependency> + <dependency> + <groupId>joda-time</groupId> + <artifactId>joda-time</artifactId> + <version>2.9.1</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/main/java/com/example/dedup/Application.java ---------------------------------------------------------------------- diff --git a/examples/dedup/src/main/java/com/example/dedup/Application.java b/examples/dedup/src/main/java/com/example/dedup/Application.java deleted file mode 100644 index cabdce2..0000000 --- a/examples/dedup/src/main/java/com/example/dedup/Application.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.example.dedup; - -import java.util.Date; -import java.util.Random; - -import org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.partitioner.StatelessPartitioner; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; - -@ApplicationAnnotation(name="DedupExample") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // Test Data Generator Operator - RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new RandomDataGeneratorOperator()); - - // Dedup Operator. Configuration through resources/META-INF/properties.xml - TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new TimeBasedDedupOperator()); - - // Console output operator for unique tuples - ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique", new ConsoleOutputOperator()); - - // Console output operator for duplicate tuples - ConsoleOutputOperator consoleDuplicate = dag.addOperator("ConsoleDuplicate", new ConsoleOutputOperator()); - - // Console output operator for duplicate tuples - ConsoleOutputOperator consoleExpired = dag.addOperator("ConsoleExpired", new ConsoleOutputOperator()); - - // Streams - dag.addStream("Generator to Dedup", gen.output, dedup.input); - - // Connect Dedup unique to Console - dag.addStream("Dedup Unique to Console", dedup.unique, consoleUnique.input); - // Connect Dedup duplicate to Console - dag.addStream("Dedup Duplicate to Console", dedup.duplicate, consoleDuplicate.input); - // Connect Dedup expired to Console - dag.addStream("Dedup Expired to Console", dedup.expired, consoleExpired.input); - - // Set Attribute TUPLE_CLASS for supplying schema information to the port - dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); - - // Uncomment the following line to create multiple partitions for Dedup operator. In this case: 2 - // dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TimeBasedDedupOperator>(2)); - } - - public static class RandomDataGeneratorOperator extends BaseOperator implements InputOperator - { - - public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>(); - private final transient Random r = new Random(); - private int tuplesPerWindow = 100; - private transient int count = 0; - - @Override - public void beginWindow(long windowId) { - count = 0; - } - - @Override - public void emitTuples() - { - if (count++ > tuplesPerWindow) { - return; - } - TestEvent event = new TestEvent(); - event.id = r.nextInt(100); - event.eventTime = new Date(System.currentTimeMillis() - (r.nextInt(60 * 1000))); - output.emit(event); - } - } - - public static class TestEvent - { - private int id; - private Date eventTime; - - public TestEvent() - { - } - - public int getId() - { - return id; - } - - public void setId(int id) - { - this.id = id; - } - - public Date getEventTime() - { - return eventTime; - } - - public void setEventTime(Date eventTime) - { - this.eventTime = eventTime; - } - - @Override - public String toString() { - return "TestEvent [id=" + id + ", eventTime=" + eventTime + "]"; - } - - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java ---------------------------------------------------------------------- diff --git a/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java b/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java new file mode 100644 index 0000000..2498d62 --- /dev/null +++ b/examples/dedup/src/main/java/org/apache/apex/examples/dedup/Application.java @@ -0,0 +1,122 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.dedup; + +import java.util.Date; +import java.util.Random; + +import org.apache.apex.malhar.lib.dedup.TimeBasedDedupOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +@ApplicationAnnotation(name="DedupExample") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Test Data Generator Operator + RandomDataGeneratorOperator gen = dag.addOperator("RandomGenerator", new RandomDataGeneratorOperator()); + + // Dedup Operator. Configuration through resources/META-INF/properties.xml + TimeBasedDedupOperator dedup = dag.addOperator("Deduper", new TimeBasedDedupOperator()); + + // Console output operator for unique tuples + ConsoleOutputOperator consoleUnique = dag.addOperator("ConsoleUnique", new ConsoleOutputOperator()); + + // Console output operator for duplicate tuples + ConsoleOutputOperator consoleDuplicate = dag.addOperator("ConsoleDuplicate", new ConsoleOutputOperator()); + + // Console output operator for duplicate tuples + ConsoleOutputOperator consoleExpired = dag.addOperator("ConsoleExpired", new ConsoleOutputOperator()); + + // Streams + dag.addStream("Generator to Dedup", gen.output, dedup.input); + + // Connect Dedup unique to Console + dag.addStream("Dedup Unique to Console", dedup.unique, consoleUnique.input); + // Connect Dedup duplicate to Console + dag.addStream("Dedup Duplicate to Console", dedup.duplicate, consoleDuplicate.input); + // Connect Dedup expired to Console + dag.addStream("Dedup Expired to Console", dedup.expired, consoleExpired.input); + + // Set Attribute TUPLE_CLASS for supplying schema information to the port + dag.setInputPortAttribute(dedup.input, Context.PortContext.TUPLE_CLASS, TestEvent.class); + + // Uncomment the following line to create multiple partitions for Dedup operator. In this case: 2 + // dag.setAttribute(dedup, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TimeBasedDedupOperator>(2)); + } + + public static class RandomDataGeneratorOperator extends BaseOperator implements InputOperator + { + + public final transient DefaultOutputPort<TestEvent> output = new DefaultOutputPort<>(); + private final transient Random r = new Random(); + private int tuplesPerWindow = 100; + private transient int count = 0; + + @Override + public void beginWindow(long windowId) { + count = 0; + } + + @Override + public void emitTuples() + { + if (count++ > tuplesPerWindow) { + return; + } + TestEvent event = new TestEvent(); + event.id = r.nextInt(100); + event.eventTime = new Date(System.currentTimeMillis() - (r.nextInt(60 * 1000))); + output.emit(event); + } + } + + public static class TestEvent + { + private int id; + private Date eventTime; + + public TestEvent() + { + } + + public int getId() + { + return id; + } + + public void setId(int id) + { + this.id = id; + } + + public Date getEventTime() + { + return eventTime; + } + + public void setEventTime(Date eventTime) + { + this.eventTime = eventTime; + } + + @Override + public String toString() { + return "TestEvent [id=" + id + ", eventTime=" + eventTime + "]"; + } + + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java b/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java deleted file mode 100644 index 9c9f17c..0000000 --- a/examples/dedup/src/test/java/com/example/dedup/ApplicationTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.example.dedup; - -import java.io.IOException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -import com.datatorrent.api.LocalMode; -import com.example.dedup.Application; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest { - - @Test - public void testApplication() throws IOException, Exception { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - Thread.sleep(10 * 1000); - lc.shutdown(); - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java b/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java new file mode 100644 index 0000000..3304a04 --- /dev/null +++ b/examples/dedup/src/test/java/org/apache/apex/examples/dedup/ApplicationTest.java @@ -0,0 +1,37 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.dedup; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest { + + @Test + public void testApplication() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + Thread.sleep(10 * 1000); + lc.shutdown(); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/pom.xml ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/pom.xml b/examples/dynamic-partition/pom.xml index 34e91ee..21b1c30 100644 --- a/examples/dynamic-partition/pom.xml +++ b/examples/dynamic-partition/pom.xml @@ -1,273 +1,25 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>dynamic-partition</artifactId> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples-dynamic-partition</artifactId> <packaging>jar</packaging> <!-- change these to the appropriate values --> <name>Dynamic Partitioning</name> <description>Example showing dynamic partitioning</description> - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - <dependencies> - <!-- add your dependencies here --> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>3.6.0</version> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> - <!-- - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - --> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>2.24.0</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java b/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java deleted file mode 100644 index 9eec263..0000000 --- a/examples/dynamic-partition/src/main/java/com/example/dynamic/App.java +++ /dev/null @@ -1,23 +0,0 @@ -package com.example.dynamic; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.DAG; - -import com.datatorrent.lib.stream.DevNull; - -@ApplicationAnnotation(name="Dyn") -public class App implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - Gen gen = dag.addOperator("gen", Gen.class); - DevNull devNull = dag.addOperator("devNull", DevNull.class); - - dag.addStream("data", gen.out, devNull.data); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java ---------------------------------------------------------------------- diff --git a/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java b/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java deleted file mode 100644 index 4cccd23..0000000 --- a/examples/dynamic-partition/src/main/java/com/example/dynamic/Gen.java +++ /dev/null @@ -1,169 +0,0 @@ -package com.example.dynamic; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import com.google.common.collect.Lists; -import org.apache.commons.io.IOUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.validation.constraints.NotNull; -import java.io.ByteArrayOutputStream; -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import com.datatorrent.api.DefaultPartition; -import com.datatorrent.api.Partitioner; -import com.datatorrent.api.StatsListener; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; - -/** - * Operator that dynamically partitions itself after 500 tuples have been emitted - */ -public class Gen extends BaseOperator implements InputOperator, Partitioner<Gen>, StatsListener -{ - private static final Logger LOG = LoggerFactory.getLogger(Gen.class); - - private static final int MAX_PARTITIONS = 4; // maximum number of partitions - - private int partitions = 2; // initial number of partitions - - @NotNull - private int numTuples; // number of tuples to emit per window - - private transient int count = 0; - - public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>(); - - @Override - public void partitioned(Map<Integer, Partition<Gen>> map) - { - if (partitions != map.size()) { - String msg = String.format("partitions = %d, map.size = %d%n", partitions, map.size()); - throw new RuntimeException(msg); - } - } - - @Override - public void beginWindow(long windowId) - { - count = 0; - } - - @Override - public void emitTuples() - { - if (count < numTuples) { - ++count; - out.emit(Math.random()); - } - } - - public int getNumTuples() - { - return numTuples; - } - - /** - * Sets the number of tuples to be emitted every window. - * @param numTuples number of tuples - */ - public void setNumTuples(int numTuples) - { - this.numTuples = numTuples; - } - - @Override - public Response processStats(BatchedOperatorStats batchedOperatorStats) { - - final long emittedCount = batchedOperatorStats.getTuplesEmittedPSMA(); - - // we only perform a single dynamic repartition - Response res = new Response(); - res.repartitionRequired = false; - if (emittedCount > 500 && partitions < MAX_PARTITIONS) { - LOG.info("processStats: trying repartition of input operator current {} required {}", - partitions, MAX_PARTITIONS); - LOG.info("**** operator id = {}, window id = {}, tuplesProcessedPSMA = {}, tuplesEmittedPSMA = {}", - batchedOperatorStats.getOperatorId(), - batchedOperatorStats.getCurrentWindowId(), - batchedOperatorStats.getTuplesProcessedPSMA(), - emittedCount); - partitions = MAX_PARTITIONS; - res.repartitionRequired = true; - } - - return res; - } // processStats - - /** - * Clone object by serializing and deserializing using Kryo. - * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields. - * - * @param kryo kryo object used to clone objects - * @param src src object that copy from - * @return cloned object - */ - @SuppressWarnings("unchecked") - private static <SRC> SRC cloneObject(Kryo kryo, SRC src) - { - kryo.setClassLoader(src.getClass().getClassLoader()); - ByteArrayOutputStream bos = null; - Output output; - Input input = null; - try { - bos = new ByteArrayOutputStream(); - output = new Output(bos); - kryo.writeObject(output, src); - output.close(); - input = new Input(bos.toByteArray()); - return (SRC)kryo.readObject(input, src.getClass()); - } finally { - IOUtils.closeQuietly(input); - IOUtils.closeQuietly(bos); - } - } - - @Override - public Collection<Partition<Gen>> definePartitions( - Collection<Partition<Gen>> list, PartitioningContext context) - { - if (partitions < 0) { // error - String msg = String.format("Error: Bad value: partitions = %d%n", partitions); - LOG.error(msg); - throw new RuntimeException(msg); - } - - final int prevCount = list.size(); - if (1 == prevCount) { // initial call - LOG.info("definePartitions: First call, prevCount = {}, partitions = {}", - prevCount, partitions); - } - - if (prevCount == partitions) { - LOG.info("definePartitions: Nothing to do in definePartitions"); - return list; // nothing to do - } - - LOG.debug("definePartitions: Repartitioning from {} to {}", prevCount, partitions); - - Kryo kryo = new Kryo(); - - // return value: new list of partitions (includes old list) - List<Partition<Gen>> newPartitions = Lists.newArrayListWithExpectedSize(partitions); - - for (int i = 0; i < partitions; i++) { - Gen oper = cloneObject(kryo, this); - newPartitions.add(new DefaultPartition<>(oper)); - } - - LOG.info("definePartition: returning {} partitions", newPartitions.size()); - return newPartitions; - } - -}
