http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/recordReader/pom.xml ---------------------------------------------------------------------- diff --git a/examples/recordReader/pom.xml b/examples/recordReader/pom.xml index 007eaef..25ac191 100644 --- a/examples/recordReader/pom.xml +++ b/examples/recordReader/pom.xml @@ -1,267 +1,28 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>recordReader</artifactId> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples-recordReader</artifactId> <packaging>jar</packaging> <name>File Record Reader</name> <description>Simple application illustrating use of record reader operator</description> - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - <malhar.version>3.6.0</malhar.version> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - <dependencies> - <!-- add your dependencies here --> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> <dependency> <groupId>org.apache.apex</groupId> <artifactId>malhar-contrib</artifactId> - <version>${malhar.version}</version> - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-contrib</artifactId> - <version>${malhar.version}</version> - <classifier>tests</classifier> - <scope>test</scope> + <version>${project.version}</version> + <!-- + If you know that your application does not need transitive dependencies pulled in by malhar-library, + uncomment the following to reduce the size of your app package. + --> <exclusions> <exclusion> <groupId>*</groupId> @@ -274,24 +35,5 @@ <artifactId>super-csv</artifactId> <version>2.4.0</version> </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>4.10</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> - <scope>test</scope> - </dependency> </dependencies> - </project>
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/recordReader/src/main/java/com/example/recordReader/Application.java ---------------------------------------------------------------------- diff --git a/examples/recordReader/src/main/java/com/example/recordReader/Application.java b/examples/recordReader/src/main/java/com/example/recordReader/Application.java deleted file mode 100644 index 8b0dd75..0000000 --- a/examples/recordReader/src/main/java/com/example/recordReader/Application.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.example.recordReader; - -import org.apache.apex.malhar.lib.fs.FSRecordReaderModule; -import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.contrib.formatter.CsvFormatter; -import com.datatorrent.contrib.parser.CsvParser; - -@ApplicationAnnotation(name="RecordReaderExample") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - FSRecordReaderModule recordReader = dag.addModule("recordReader", FSRecordReaderModule.class); - CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class); - CsvFormatter formatter = dag.addOperator("formatter", new CsvFormatter()); - StringFileOutputOperator fileOutput = dag.addOperator("fileOutput", new StringFileOutputOperator()); - - dag.addStream("record", recordReader.records, csvParser.in); - dag.addStream("pojo", csvParser.out, formatter.in); - dag.addStream("string", formatter.out, fileOutput.input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/recordReader/src/main/java/com/example/recordReader/TransactionsSchema.java ---------------------------------------------------------------------- diff --git a/examples/recordReader/src/main/java/com/example/recordReader/TransactionsSchema.java b/examples/recordReader/src/main/java/com/example/recordReader/TransactionsSchema.java deleted file mode 100644 index de20516..0000000 --- a/examples/recordReader/src/main/java/com/example/recordReader/TransactionsSchema.java +++ /dev/null @@ -1,168 +0,0 @@ -/** - * Copyright (c) 2016 DataTorrent, Inc. - * All rights reserved. - */ - -package com.example.recordReader; - -import java.util.Date; - -public class TransactionsSchema -{ - private String customerName; - private String customerPhone; - private String customerEmail; - private String city; - private String country; - private String uid; - private String accountNumber; - private long txId; - private Date txDate; - private double txAmount; - - public String getCustomerName() - { - return customerName; - } - - public void setCustomerName(String customerName) - { - this.customerName = customerName; - } - - public String getCustomerPhone() - { - return customerPhone; - } - - public void setCustomerPhone(String customerPhone) - { - this.customerPhone = customerPhone; - } - - public String getCustomerEmail() - { - return customerEmail; - } - - public void setCustomerEmail(String customerEmail) - { - this.customerEmail = customerEmail; - } - - public String getCity() - { - return city; - } - - public void setCity(String city) - { - this.city = city; - } - - public String getCountry() - { - return country; - } - - public void setCountry(String country) - { - this.country = country; - } - - public String getUid() - { - return uid; - } - - public void setUid(String uid) - { - this.uid = uid; - } - - public String getAccountNumber() - { - return accountNumber; - } - - public void setAccountNumber(String accountNumber) - { - this.accountNumber = accountNumber; - } - - public long getTxId() - { - return txId; - } - - public void setTxId(long txId) - { - this.txId = txId; - } - - public Date getTxDate() - { - return txDate; - } - - public void setTxDate(Date txDate) - { - this.txDate = txDate; - } - - public double getTxAmount() - { - return txAmount; - } - - public void setTxAmount(double txAmount) - { - this.txAmount = txAmount; - } - - @Override - public String toString() - { - return "TransactionsSchema [customerName=" + customerName + ", customerPhone=" + customerPhone + ", customerEmail=" - + customerEmail + ", city=" + city + ", country=" + country + ", uid=" + uid + ", accountNumber=" - + accountNumber + ", txId=" + txId + ", txDate=" + txDate + ", txAmount=" + txAmount - + "]\n"; - } - - @Override - public int hashCode() - { - final int prime = 31; - int result = 1; - result = prime * result + ((txDate == null) ? 0 : txDate.hashCode()); - result = prime * result + (int)(txId ^ (txId >>> 32)); - return result; - } - - @Override - public boolean equals(Object obj) - { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - TransactionsSchema other = (TransactionsSchema)obj; - if (txDate == null) { - if (other.txDate != null) { - return false; - } - } else if (!txDate.equals(other.txDate)) { - return false; - } - if (txId != other.txId) { - return false; - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/recordReader/src/main/java/org/apache/apex/examples/recordReader/Application.java ---------------------------------------------------------------------- diff --git a/examples/recordReader/src/main/java/org/apache/apex/examples/recordReader/Application.java b/examples/recordReader/src/main/java/org/apache/apex/examples/recordReader/Application.java new file mode 100644 index 0000000..916b061 --- /dev/null +++ b/examples/recordReader/src/main/java/org/apache/apex/examples/recordReader/Application.java @@ -0,0 +1,32 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.recordReader; + +import org.apache.apex.malhar.lib.fs.FSRecordReaderModule; +import org.apache.apex.malhar.lib.fs.GenericFileOutputOperator.StringFileOutputOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.contrib.formatter.CsvFormatter; +import com.datatorrent.contrib.parser.CsvParser; + +@ApplicationAnnotation(name="RecordReaderExample") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + FSRecordReaderModule recordReader = dag.addModule("recordReader", FSRecordReaderModule.class); + CsvParser csvParser = dag.addOperator("csvParser", CsvParser.class); + CsvFormatter formatter = dag.addOperator("formatter", new CsvFormatter()); + StringFileOutputOperator fileOutput = dag.addOperator("fileOutput", new StringFileOutputOperator()); + + dag.addStream("record", recordReader.records, csvParser.in); + dag.addStream("pojo", csvParser.out, formatter.in); + dag.addStream("string", formatter.out, fileOutput.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/recordReader/src/main/java/org/apache/apex/examples/recordReader/TransactionsSchema.java ---------------------------------------------------------------------- diff --git a/examples/recordReader/src/main/java/org/apache/apex/examples/recordReader/TransactionsSchema.java b/examples/recordReader/src/main/java/org/apache/apex/examples/recordReader/TransactionsSchema.java new file mode 100644 index 0000000..28c90f2 --- /dev/null +++ b/examples/recordReader/src/main/java/org/apache/apex/examples/recordReader/TransactionsSchema.java @@ -0,0 +1,168 @@ +/** + * Copyright (c) 2016 DataTorrent, Inc. + * All rights reserved. + */ + +package org.apache.apex.examples.recordReader; + +import java.util.Date; + +public class TransactionsSchema +{ + private String customerName; + private String customerPhone; + private String customerEmail; + private String city; + private String country; + private String uid; + private String accountNumber; + private long txId; + private Date txDate; + private double txAmount; + + public String getCustomerName() + { + return customerName; + } + + public void setCustomerName(String customerName) + { + this.customerName = customerName; + } + + public String getCustomerPhone() + { + return customerPhone; + } + + public void setCustomerPhone(String customerPhone) + { + this.customerPhone = customerPhone; + } + + public String getCustomerEmail() + { + return customerEmail; + } + + public void setCustomerEmail(String customerEmail) + { + this.customerEmail = customerEmail; + } + + public String getCity() + { + return city; + } + + public void setCity(String city) + { + this.city = city; + } + + public String getCountry() + { + return country; + } + + public void setCountry(String country) + { + this.country = country; + } + + public String getUid() + { + return uid; + } + + public void setUid(String uid) + { + this.uid = uid; + } + + public String getAccountNumber() + { + return accountNumber; + } + + public void setAccountNumber(String accountNumber) + { + this.accountNumber = accountNumber; + } + + public long getTxId() + { + return txId; + } + + public void setTxId(long txId) + { + this.txId = txId; + } + + public Date getTxDate() + { + return txDate; + } + + public void setTxDate(Date txDate) + { + this.txDate = txDate; + } + + public double getTxAmount() + { + return txAmount; + } + + public void setTxAmount(double txAmount) + { + this.txAmount = txAmount; + } + + @Override + public String toString() + { + return "TransactionsSchema [customerName=" + customerName + ", customerPhone=" + customerPhone + ", customerEmail=" + + customerEmail + ", city=" + city + ", country=" + country + ", uid=" + uid + ", accountNumber=" + + accountNumber + ", txId=" + txId + ", txDate=" + txDate + ", txAmount=" + txAmount + + "]\n"; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((txDate == null) ? 0 : txDate.hashCode()); + result = prime * result + (int)(txId ^ (txId >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + TransactionsSchema other = (TransactionsSchema)obj; + if (txDate == null) { + if (other.txDate != null) { + return false; + } + } else if (!txDate.equals(other.txDate)) { + return false; + } + if (txId != other.txId) { + return false; + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/recordReader/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/recordReader/src/main/resources/META-INF/properties.xml b/examples/recordReader/src/main/resources/META-INF/properties.xml index 06961f1..612e3fa 100644 --- a/examples/recordReader/src/main/resources/META-INF/properties.xml +++ b/examples/recordReader/src/main/resources/META-INF/properties.xml @@ -129,11 +129,11 @@ </property> <property> <name>dt.application.RecordReaderExample.operator.csvParser.port.out.attr.TUPLE_CLASS</name> - <value>com.example.recordReader.TransactionsSchema</value> + <value>org.apache.apex.examples.recordReader.TransactionsSchema</value> </property> <property> <name>dt.application.RecordReaderExample.operator.formatter.port.in.attr.TUPLE_CLASS</name> - <value>com.example.recordReader.TransactionsSchema</value> + <value>org.apache.apex.examples.recordReader.TransactionsSchema</value> </property> <property> <name>dt.application.RecordReaderExample.operator.fileOutput.prop.filePath</name> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/recordReader/src/test/java/com/example/recordReader/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/recordReader/src/test/java/com/example/recordReader/ApplicationTest.java b/examples/recordReader/src/test/java/com/example/recordReader/ApplicationTest.java deleted file mode 100644 index 222ad06..0000000 --- a/examples/recordReader/src/test/java/com/example/recordReader/ApplicationTest.java +++ /dev/null @@ -1,91 +0,0 @@ -package com.example.recordReader; - -import java.io.File; -import java.io.IOException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestWatcher; -import org.junit.runner.Description; - -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.LocalMode; - -/** - * Test application in local mode. - */ -public class ApplicationTest -{ - private String outputDir; - - public static class TestMeta extends TestWatcher - { - public String baseDirectory; - - @Override - protected void starting(org.junit.runner.Description description) - { - this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); - } - - @Override - protected void finished(Description description) - { - super.finished(description); - try { - FileUtils.forceDelete(new File(baseDirectory)); - } catch (IOException e) { - e.printStackTrace(); - } - } - - } - - @Rule - public TestMeta testMeta = new TestMeta(); - - @Before - public void setup() throws Exception - { - outputDir = testMeta.baseDirectory + File.separator + "output"; - } - - @Test - public void testApplication() throws IOException, Exception - { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - conf.set("dt.application.RecordReaderExample.operator.fileOutput.prop.filePath", outputDir); - File outputfile = FileUtils.getFile(outputDir, "output.txt_5.0"); - - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.runAsync(); - - // wait for tuples to show up - while (!outputfile.exists()) { - System.out.println("Sleeping ...."); - Thread.sleep(1000); - } - - lc.shutdown(); - Assert.assertTrue( - FileUtils.contentEquals( - FileUtils.getFile( - conf.get("dt.application.RecordReaderExample.operator.recordReader.prop.files") - ),outputfile)); - - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/recordReader/src/test/java/org/apache/apex/examples/recordReader/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/recordReader/src/test/java/org/apache/apex/examples/recordReader/ApplicationTest.java b/examples/recordReader/src/test/java/org/apache/apex/examples/recordReader/ApplicationTest.java new file mode 100644 index 0000000..7e133ff --- /dev/null +++ b/examples/recordReader/src/test/java/org/apache/apex/examples/recordReader/ApplicationTest.java @@ -0,0 +1,91 @@ +package org.apache.apex.examples.recordReader; + +import java.io.File; +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +/** + * Test application in local mode. + */ +public class ApplicationTest +{ + private String outputDir; + + public static class TestMeta extends TestWatcher + { + public String baseDirectory; + + @Override + protected void starting(org.junit.runner.Description description) + { + this.baseDirectory = "target/" + description.getClassName() + "/" + description.getMethodName(); + } + + @Override + protected void finished(Description description) + { + super.finished(description); + try { + FileUtils.forceDelete(new File(baseDirectory)); + } catch (IOException e) { + e.printStackTrace(); + } + } + + } + + @Rule + public TestMeta testMeta = new TestMeta(); + + @Before + public void setup() throws Exception + { + outputDir = testMeta.baseDirectory + File.separator + "output"; + } + + @Test + public void testApplication() throws IOException, Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + conf.set("dt.application.RecordReaderExample.operator.fileOutput.prop.filePath", outputDir); + File outputfile = FileUtils.getFile(outputDir, "output.txt_5.0"); + + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for tuples to show up + while (!outputfile.exists()) { + System.out.println("Sleeping ...."); + Thread.sleep(1000); + } + + lc.shutdown(); + Assert.assertTrue( + FileUtils.contentEquals( + FileUtils.getFile( + conf.get("dt.application.RecordReaderExample.operator.recordReader.prop.files") + ),outputfile)); + + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/pom.xml ---------------------------------------------------------------------- diff --git a/examples/throttle/pom.xml b/examples/throttle/pom.xml index 02fe22e..d312268 100644 --- a/examples/throttle/pom.xml +++ b/examples/throttle/pom.xml @@ -1,263 +1,23 @@ <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> - <groupId>com.datatorrent.examples</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>throttle</artifactId> + <artifactId>malhar-examples-throttle</artifactId> <packaging>jar</packaging> <!-- change these to the appropriate values --> <name>Throttle Application</name> <description>Application demonstrating throttling input when downstream is slower</description> - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - <dependencies> <!-- add your dependencies here --> <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>3.6.0</version> - <!-- - If you know that your application does not need transitive dependencies pulled in by malhar-library, - uncomment the following to reduce the size of your app package. - --> - <!-- - <exclusions> - <exclusion> - <groupId>*</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> - --> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> @@ -266,7 +26,7 @@ <dependency> <groupId>org.apache.apex</groupId> <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> + <version>${apex.core.version}</version> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/com/datatorrent/examples/throttle/Application.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/Application.java b/examples/throttle/src/main/java/com/datatorrent/examples/throttle/Application.java deleted file mode 100644 index d789dbe..0000000 --- a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/Application.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.datatorrent.examples.throttle; - -import java.util.Collection; - -import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.Lists; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StatsListener; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; - -@ApplicationAnnotation(name="ThrottleApplication") -public class Application implements StreamingApplication -{ - - @Override - public void populateDAG(DAG dag, Configuration conf) - { - // Creating an example application with three operators - // The last operator is slowing down the DAG - // With the use of the stats listener the input operator is slowed when the window difference crosses a threshold - - RandomNumberGenerator randomGenerator = dag.addOperator("RandomGenerator", RandomNumberGenerator.class); - PassThroughOperator<Double> passThrough = dag.addOperator("PassThrough", PassThroughOperator.class); - SlowDevNullOperator<Double> devNull = dag.addOperator("SlowNull", SlowDevNullOperator.class); - - // Important to use the same stats listener object for all operators so that we can centrally collect stats and make - // the decision - StatsListener statsListener = new ThrottlingStatsListener(); - Collection<StatsListener> statsListeners = Lists.newArrayList(statsListener); - dag.setAttribute(randomGenerator, Context.OperatorContext.STATS_LISTENERS, statsListeners); - dag.setAttribute(passThrough, Context.OperatorContext.STATS_LISTENERS, statsListeners); - dag.setAttribute(devNull, Context.OperatorContext.STATS_LISTENERS, statsListeners); - - // Increase timeout for the slow operator, this specifies the maximum timeout for an operator to process a window - // It is specified in number of windows, since 1 window is 500ms, 30 minutes is 30 * 60 * 2 = 3600 windows - dag.setAttribute(devNull, Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 3600); - - // If there are unifiers that are slow then set timeout for them - // dag.setUnifierAttribute(passThrough.output, Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 3600); - - dag.addStream("randomData", randomGenerator.out, passThrough.input); - dag.addStream("passData", passThrough.output, devNull.input); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/com/datatorrent/examples/throttle/PassThroughOperator.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/PassThroughOperator.java b/examples/throttle/src/main/java/com/datatorrent/examples/throttle/PassThroughOperator.java deleted file mode 100644 index b4630df..0000000 --- a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/PassThroughOperator.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.datatorrent.examples.throttle; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.common.util.BaseOperator; - -/** - * Created by pramod on 9/27/16. - */ -public class PassThroughOperator<T> extends BaseOperator { - - public transient final DefaultInputPort<T> input = new DefaultInputPort<T>() { - @Override - public void process(T t) { - output.emit(t); - } - }; - - public transient final DefaultOutputPort<T> output = new DefaultOutputPort<>(); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/com/datatorrent/examples/throttle/RandomNumberGenerator.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/RandomNumberGenerator.java b/examples/throttle/src/main/java/com/datatorrent/examples/throttle/RandomNumberGenerator.java deleted file mode 100644 index ea57b6d..0000000 --- a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/RandomNumberGenerator.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.datatorrent.examples.throttle; - -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.common.util.BaseOperator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This is a simple operator that emits random number. - */ -public class RandomNumberGenerator extends BaseOperator implements InputOperator -{ - private int numTuples = 1000; - private int origNumTuples = numTuples; - private transient int count = 0; - - private static final Logger logger = LoggerFactory.getLogger(RandomNumberGenerator.class); - - public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>(); - - @Override - public void beginWindow(long windowId) - { - count = 0; - } - - @Override - public void emitTuples() - { - if (count++ < numTuples) { - out.emit(Math.random()); - } - } - - // Simple suspend and - public void suspend() { - logger.debug("Slowing down"); - numTuples = 0; - } - - public void normal() { - logger.debug("Normal"); - numTuples = origNumTuples; - } - - public int getNumTuples() - { - return numTuples; - } - - /** - * Sets the number of tuples to be emitted every window. - * @param numTuples number of tuples - */ - public void setNumTuples(int numTuples) - { - this.numTuples = numTuples; - this.origNumTuples = numTuples; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/com/datatorrent/examples/throttle/SlowDevNullOperator.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/SlowDevNullOperator.java b/examples/throttle/src/main/java/com/datatorrent/examples/throttle/SlowDevNullOperator.java deleted file mode 100644 index 7d1451d..0000000 --- a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/SlowDevNullOperator.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.datatorrent.examples.throttle; - -import com.google.common.base.Throwables; - -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.common.util.BaseOperator; - -/** - * Created by pramod on 9/27/16. - */ -public class SlowDevNullOperator<T> extends BaseOperator { - - // Modify sleep time dynamically while app is running to increase and decrease sleep time - long sleepTime = 1; - - public transient final DefaultInputPort<T> input = new DefaultInputPort<T>() { - @Override - public void process(T t) { - // Introduce an artificial delay for every tuple - try { - Thread.sleep(sleepTime); - } catch (InterruptedException e) { - throw Throwables.propagate(e); - } - } - }; - - public long getSleepTime() { - return sleepTime; - } - - public void setSleepTime(long sleepTime) { - this.sleepTime = sleepTime; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/com/datatorrent/examples/throttle/ThrottlingStatsListener.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/ThrottlingStatsListener.java b/examples/throttle/src/main/java/com/datatorrent/examples/throttle/ThrottlingStatsListener.java deleted file mode 100644 index 46e2e0e..0000000 --- a/examples/throttle/src/main/java/com/datatorrent/examples/throttle/ThrottlingStatsListener.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.datatorrent.examples.throttle; - -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Maps; - -import com.datatorrent.api.Operator; -import com.datatorrent.api.StatsListener; - -/** - * Created by pramod on 9/27/16. - */ -public class ThrottlingStatsListener implements StatsListener, Serializable { - - private static final Logger logger = LoggerFactory.getLogger(ThrottlingStatsListener.class); - - // Slowdown input if the window difference between operators increases beyond this value - long maxThreshold = 100; - // restore input operator to normal speed if the window difference falls below this threshold - long minThreshold = 100; - - Map<Integer, ThrottleState> throttleStates = Maps.newHashMap(); - - static class ThrottleState { - // The current state of the operator, normal or throttled - boolean normal = true; - //The latest window id for which stats were received for the operator - long currentWindowId; - } - - // This method runs on the app master side and is called whenever new stats are received from the operators - @Override - public Response processStats(BatchedOperatorStats batchedOperatorStats) - { - Response response = new Response(); - int operatorId = batchedOperatorStats.getOperatorId(); - - ThrottleState throttleState = throttleStates.get(operatorId); - if (throttleState == null) { - throttleState = new ThrottleState(); - throttleStates.put(operatorId, throttleState); - } - - long windowId = batchedOperatorStats.getCurrentWindowId(); - throttleState.currentWindowId = windowId; - - // Find min and max window to compute difference - long minWindow = Long.MAX_VALUE; - long maxWindow = Long.MIN_VALUE; - for (ThrottleState state : throttleStates.values()) { - if (state.currentWindowId < minWindow) minWindow = state.currentWindowId; - if (state.currentWindowId > maxWindow) maxWindow = state.currentWindowId; - } - logger.debug("Operator {} min window {} max window {}", operatorId, minWindow, maxWindow); - - if (throttleState.normal && ((maxWindow - minWindow) > maxThreshold)) { - // Send request to operator to slow down - logger.info("Sending suspend request"); - List<OperatorRequest> operatorRequests = new ArrayList<OperatorRequest>(); - operatorRequests.add(new InputSlowdownRequest()); - response.operatorRequests = operatorRequests; - //logger.info("Setting suspend"); - throttleState.normal = false; - } else if (!throttleState.normal && ((maxWindow - minWindow) <= minThreshold)) { - // Send request to operator to get back to normal - logger.info("Sending normal request"); - List<OperatorRequest> operatorRequests = new ArrayList<OperatorRequest>(); - operatorRequests.add(new InputNormalRequest()); - response.operatorRequests = operatorRequests; - //logger.info("Setting normal"); - throttleState.normal = true; - } - - return response; - } - - // This runs on the operator side - public static class InputSlowdownRequest implements OperatorRequest, Serializable - { - private static final Logger logger = LoggerFactory.getLogger(InputSlowdownRequest.class); - - @Override - public OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException - { - logger.debug("Received slowdown operator {} operatorId {} windowId {}", operator, operatorId, windowId); - if (operator instanceof RandomNumberGenerator) { - RandomNumberGenerator generator = (RandomNumberGenerator)operator; - generator.suspend(); - } - return new InputOperatorResponse(); - } - } - - public static class InputNormalRequest implements OperatorRequest, Serializable - { - private static final Logger logger = LoggerFactory.getLogger(InputNormalRequest.class); - - @Override - public OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException - { - logger.debug("Received normal operator {} operatorId {} windowId {}", operator, operatorId, windowId); - if (operator instanceof RandomNumberGenerator) { - RandomNumberGenerator generator = (RandomNumberGenerator)operator; - generator.normal(); - } - return new InputOperatorResponse(); - } - } - - public static class InputOperatorResponse implements OperatorResponse, Serializable - { - - @Override - public Object getResponseId() { - return 1; - } - - @Override - public Object getResponse() { - return ""; - } - } - - public long getMaxThreshold() - { - return maxThreshold; - } - - public void setMaxThreshold(long maxThreshold) - { - this.maxThreshold = maxThreshold; - } - - public long getMinThreshold() - { - return minThreshold; - } - - public void setMinThreshold(long minThreshold) - { - this.minThreshold = minThreshold; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/org/apache/apex/examples/throttle/Application.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/org/apache/apex/examples/throttle/Application.java b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/Application.java new file mode 100644 index 0000000..9adad32 --- /dev/null +++ b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/Application.java @@ -0,0 +1,51 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.throttle; + +import java.util.Collection; + +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StatsListener; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name="ThrottleApplication") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // Creating an example application with three operators + // The last operator is slowing down the DAG + // With the use of the stats listener the input operator is slowed when the window difference crosses a threshold + + RandomNumberGenerator randomGenerator = dag.addOperator("RandomGenerator", RandomNumberGenerator.class); + PassThroughOperator<Double> passThrough = dag.addOperator("PassThrough", PassThroughOperator.class); + SlowDevNullOperator<Double> devNull = dag.addOperator("SlowNull", SlowDevNullOperator.class); + + // Important to use the same stats listener object for all operators so that we can centrally collect stats and make + // the decision + StatsListener statsListener = new ThrottlingStatsListener(); + Collection<StatsListener> statsListeners = Lists.newArrayList(statsListener); + dag.setAttribute(randomGenerator, Context.OperatorContext.STATS_LISTENERS, statsListeners); + dag.setAttribute(passThrough, Context.OperatorContext.STATS_LISTENERS, statsListeners); + dag.setAttribute(devNull, Context.OperatorContext.STATS_LISTENERS, statsListeners); + + // Increase timeout for the slow operator, this specifies the maximum timeout for an operator to process a window + // It is specified in number of windows, since 1 window is 500ms, 30 minutes is 30 * 60 * 2 = 3600 windows + dag.setAttribute(devNull, Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 3600); + + // If there are unifiers that are slow then set timeout for them + // dag.setUnifierAttribute(passThrough.output, Context.OperatorContext.TIMEOUT_WINDOW_COUNT, 3600); + + dag.addStream("randomData", randomGenerator.out, passThrough.input); + dag.addStream("passData", passThrough.output, devNull.input); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/org/apache/apex/examples/throttle/PassThroughOperator.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/org/apache/apex/examples/throttle/PassThroughOperator.java b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/PassThroughOperator.java new file mode 100644 index 0000000..bf0fbce --- /dev/null +++ b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/PassThroughOperator.java @@ -0,0 +1,20 @@ +package org.apache.apex.examples.throttle; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * Created by pramod on 9/27/16. + */ +public class PassThroughOperator<T> extends BaseOperator { + + public transient final DefaultInputPort<T> input = new DefaultInputPort<T>() { + @Override + public void process(T t) { + output.emit(t); + } + }; + + public transient final DefaultOutputPort<T> output = new DefaultOutputPort<>(); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/org/apache/apex/examples/throttle/RandomNumberGenerator.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/org/apache/apex/examples/throttle/RandomNumberGenerator.java b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/RandomNumberGenerator.java new file mode 100644 index 0000000..6825340 --- /dev/null +++ b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/RandomNumberGenerator.java @@ -0,0 +1,64 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.throttle; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a simple operator that emits random number. + */ +public class RandomNumberGenerator extends BaseOperator implements InputOperator +{ + private int numTuples = 1000; + private int origNumTuples = numTuples; + private transient int count = 0; + + private static final Logger logger = LoggerFactory.getLogger(RandomNumberGenerator.class); + + public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>(); + + @Override + public void beginWindow(long windowId) + { + count = 0; + } + + @Override + public void emitTuples() + { + if (count++ < numTuples) { + out.emit(Math.random()); + } + } + + // Simple suspend and + public void suspend() { + logger.debug("Slowing down"); + numTuples = 0; + } + + public void normal() { + logger.debug("Normal"); + numTuples = origNumTuples; + } + + public int getNumTuples() + { + return numTuples; + } + + /** + * Sets the number of tuples to be emitted every window. + * @param numTuples number of tuples + */ + public void setNumTuples(int numTuples) + { + this.numTuples = numTuples; + this.origNumTuples = numTuples; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/org/apache/apex/examples/throttle/SlowDevNullOperator.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/org/apache/apex/examples/throttle/SlowDevNullOperator.java b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/SlowDevNullOperator.java new file mode 100644 index 0000000..b49b3c8 --- /dev/null +++ b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/SlowDevNullOperator.java @@ -0,0 +1,35 @@ +package org.apache.apex.examples.throttle; + +import com.google.common.base.Throwables; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * Created by pramod on 9/27/16. + */ +public class SlowDevNullOperator<T> extends BaseOperator { + + // Modify sleep time dynamically while app is running to increase and decrease sleep time + long sleepTime = 1; + + public transient final DefaultInputPort<T> input = new DefaultInputPort<T>() { + @Override + public void process(T t) { + // Introduce an artificial delay for every tuple + try { + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + throw Throwables.propagate(e); + } + } + }; + + public long getSleepTime() { + return sleepTime; + } + + public void setSleepTime(long sleepTime) { + this.sleepTime = sleepTime; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/main/java/org/apache/apex/examples/throttle/ThrottlingStatsListener.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/main/java/org/apache/apex/examples/throttle/ThrottlingStatsListener.java b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/ThrottlingStatsListener.java new file mode 100644 index 0000000..7e9e70f --- /dev/null +++ b/examples/throttle/src/main/java/org/apache/apex/examples/throttle/ThrottlingStatsListener.java @@ -0,0 +1,150 @@ +package org.apache.apex.examples.throttle; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.Operator; +import com.datatorrent.api.StatsListener; + +/** + * Created by pramod on 9/27/16. + */ +public class ThrottlingStatsListener implements StatsListener, Serializable { + + private static final Logger logger = LoggerFactory.getLogger(ThrottlingStatsListener.class); + + // Slowdown input if the window difference between operators increases beyond this value + long maxThreshold = 100; + // restore input operator to normal speed if the window difference falls below this threshold + long minThreshold = 100; + + Map<Integer, ThrottleState> throttleStates = Maps.newHashMap(); + + static class ThrottleState { + // The current state of the operator, normal or throttled + boolean normal = true; + //The latest window id for which stats were received for the operator + long currentWindowId; + } + + // This method runs on the app master side and is called whenever new stats are received from the operators + @Override + public Response processStats(BatchedOperatorStats batchedOperatorStats) + { + Response response = new Response(); + int operatorId = batchedOperatorStats.getOperatorId(); + + ThrottleState throttleState = throttleStates.get(operatorId); + if (throttleState == null) { + throttleState = new ThrottleState(); + throttleStates.put(operatorId, throttleState); + } + + long windowId = batchedOperatorStats.getCurrentWindowId(); + throttleState.currentWindowId = windowId; + + // Find min and max window to compute difference + long minWindow = Long.MAX_VALUE; + long maxWindow = Long.MIN_VALUE; + for (ThrottleState state : throttleStates.values()) { + if (state.currentWindowId < minWindow) minWindow = state.currentWindowId; + if (state.currentWindowId > maxWindow) maxWindow = state.currentWindowId; + } + logger.debug("Operator {} min window {} max window {}", operatorId, minWindow, maxWindow); + + if (throttleState.normal && ((maxWindow - minWindow) > maxThreshold)) { + // Send request to operator to slow down + logger.info("Sending suspend request"); + List<OperatorRequest> operatorRequests = new ArrayList<OperatorRequest>(); + operatorRequests.add(new InputSlowdownRequest()); + response.operatorRequests = operatorRequests; + //logger.info("Setting suspend"); + throttleState.normal = false; + } else if (!throttleState.normal && ((maxWindow - minWindow) <= minThreshold)) { + // Send request to operator to get back to normal + logger.info("Sending normal request"); + List<OperatorRequest> operatorRequests = new ArrayList<OperatorRequest>(); + operatorRequests.add(new InputNormalRequest()); + response.operatorRequests = operatorRequests; + //logger.info("Setting normal"); + throttleState.normal = true; + } + + return response; + } + + // This runs on the operator side + public static class InputSlowdownRequest implements OperatorRequest, Serializable + { + private static final Logger logger = LoggerFactory.getLogger(InputSlowdownRequest.class); + + @Override + public OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException + { + logger.debug("Received slowdown operator {} operatorId {} windowId {}", operator, operatorId, windowId); + if (operator instanceof RandomNumberGenerator) { + RandomNumberGenerator generator = (RandomNumberGenerator)operator; + generator.suspend(); + } + return new InputOperatorResponse(); + } + } + + public static class InputNormalRequest implements OperatorRequest, Serializable + { + private static final Logger logger = LoggerFactory.getLogger(InputNormalRequest.class); + + @Override + public OperatorResponse execute(Operator operator, int operatorId, long windowId) throws IOException + { + logger.debug("Received normal operator {} operatorId {} windowId {}", operator, operatorId, windowId); + if (operator instanceof RandomNumberGenerator) { + RandomNumberGenerator generator = (RandomNumberGenerator)operator; + generator.normal(); + } + return new InputOperatorResponse(); + } + } + + public static class InputOperatorResponse implements OperatorResponse, Serializable + { + + @Override + public Object getResponseId() { + return 1; + } + + @Override + public Object getResponse() { + return ""; + } + } + + public long getMaxThreshold() + { + return maxThreshold; + } + + public void setMaxThreshold(long maxThreshold) + { + this.maxThreshold = maxThreshold; + } + + public long getMinThreshold() + { + return minThreshold; + } + + public void setMinThreshold(long minThreshold) + { + this.minThreshold = minThreshold; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/test/java/com/datatorrent/examples/throttle/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/test/java/com/datatorrent/examples/throttle/ApplicationTest.java b/examples/throttle/src/test/java/com/datatorrent/examples/throttle/ApplicationTest.java deleted file mode 100644 index 02a96ba..0000000 --- a/examples/throttle/src/test/java/com/datatorrent/examples/throttle/ApplicationTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Put your copyright and license info here. - */ -package com.datatorrent.examples.throttle; - -import java.io.IOException; - -import javax.validation.ConstraintViolationException; - -import org.junit.Assert; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - -import com.datatorrent.api.LocalMode; -import com.datatorrent.examples.throttle.Application; - -/** - * Test the DAG declaration in local mode. - */ -public class ApplicationTest { - - @Test - public void testApplication() throws IOException, Exception { - try { - LocalMode lma = LocalMode.newInstance(); - Configuration conf = new Configuration(false); - conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); - lma.prepareDAG(new Application(), conf); - LocalMode.Controller lc = lma.getController(); - lc.run(10000); // runs for 10 seconds and quits - } catch (ConstraintViolationException e) { - Assert.fail("constraint violations: " + e.getConstraintViolations()); - } - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/throttle/src/test/java/org/apache/apex/examples/throttle/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/throttle/src/test/java/org/apache/apex/examples/throttle/ApplicationTest.java b/examples/throttle/src/test/java/org/apache/apex/examples/throttle/ApplicationTest.java new file mode 100644 index 0000000..e85c5c5 --- /dev/null +++ b/examples/throttle/src/test/java/org/apache/apex/examples/throttle/ApplicationTest.java @@ -0,0 +1,36 @@ +/** + * Put your copyright and license info here. + */ +package org.apache.apex.examples.throttle; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import com.datatorrent.api.LocalMode; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest { + + @Test + public void testApplication() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(10000); // runs for 10 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/pom.xml ---------------------------------------------------------------------- diff --git a/examples/transform/pom.xml b/examples/transform/pom.xml index e8846d3..93dbad1 100644 --- a/examples/transform/pom.xml +++ b/examples/transform/pom.xml @@ -2,250 +2,21 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>com.example</groupId> - <version>1.0-SNAPSHOT</version> - <artifactId>transform</artifactId> + <parent> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-examples</artifactId> + <version>3.7.0-SNAPSHOT</version> + </parent> + + <artifactId>malhar-examples-transform</artifactId> <packaging>jar</packaging> <name>Transform Application</name> <description>Sample application for transform operator</description> - <properties> - <!-- change this if you desire to use a different version of Apex Core --> - <apex.version>3.5.0</apex.version> - <malhar.version>3.6.0</malhar.version> - <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> - </properties> - - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-eclipse-plugin</artifactId> - <version>2.9</version> - <configuration> - <downloadSources>true</downloadSources> - </configuration> - </plugin> - <plugin> - <artifactId>maven-compiler-plugin</artifactId> - <version>3.3</version> - <configuration> - <encoding>UTF-8</encoding> - <source>1.7</source> - <target>1.7</target> - <debug>true</debug> - <optimize>false</optimize> - <showDeprecation>true</showDeprecation> - <showWarnings>true</showWarnings> - </configuration> - </plugin> - <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <version>2.8</version> - <executions> - <execution> - <id>copy-dependencies</id> - <phase>prepare-package</phase> - <goals> - <goal>copy-dependencies</goal> - </goals> - <configuration> - <outputDirectory>target/deps</outputDirectory> - <includeScope>runtime</includeScope> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <executions> - <execution> - <id>app-package-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - <configuration> - <finalName>${project.artifactId}-${project.version}-apexapp</finalName> - <appendAssemblyId>false</appendAssemblyId> - <descriptors> - <descriptor>src/assemble/appPackage.xml</descriptor> - </descriptors> - <archiverConfig> - <defaultDirectoryMode>0755</defaultDirectoryMode> - </archiverConfig> - <archive> - <manifestEntries> - <Class-Path>${apex.apppackage.classpath}</Class-Path> - <DT-Engine-Version>${apex.version}</DT-Engine-Version> - <DT-App-Package-Group-Id>${project.groupId}</DT-App-Package-Group-Id> - <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> - <DT-App-Package-Version>${project.version}</DT-App-Package-Version> - <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> - <DT-App-Package-Description>${project.description}</DT-App-Package-Description> - </manifestEntries> - </archive> - </configuration> - </execution> - </executions> - </plugin> - - <plugin> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.7</version> - <executions> - <execution> - <phase>package</phase> - <configuration> - <target> - <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" - tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - <execution> - <!-- create resource directory for xml javadoc--> - <id>createJavadocDirectory</id> - <phase>generate-resources</phase> - <configuration> - <tasks> - <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> - <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> - </tasks> - </configuration> - <goals> - <goal>run</goal> - </goals> - </execution> - </executions> - </plugin> - - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <version>1.9.1</version> - <executions> - <execution> - <id>attach-artifacts</id> - <phase>package</phase> - <goals> - <goal>attach-artifact</goal> - </goals> - <configuration> - <artifacts> - <artifact> - <file>target/${project.artifactId}-${project.version}.apa</file> - <type>apa</type> - </artifact> - </artifacts> - <skipAttach>false</skipAttach> - </configuration> - </execution> - </executions> - </plugin> - - <!-- generate javdoc --> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-javadoc-plugin</artifactId> - <executions> - <!-- generate xml javadoc --> - <execution> - <id>xml-doclet</id> - <phase>generate-resources</phase> - <goals> - <goal>javadoc</goal> - </goals> - <configuration> - <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> - <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> - <useStandardDocletOptions>false</useStandardDocletOptions> - <docletArtifact> - <groupId>com.github.markusbernhardt</groupId> - <artifactId>xml-doclet</artifactId> - <version>1.0.4</version> - </docletArtifact> - </configuration> - </execution> - </executions> - </plugin> - <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>xml-maven-plugin</artifactId> - <version>1.0</version> - <executions> - <execution> - <id>transform-xmljavadoc</id> - <phase>generate-resources</phase> - <goals> - <goal>transform</goal> - </goals> - </execution> - </executions> - <configuration> - <transformationSets> - <transformationSet> - <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> - <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> - </transformationSet> - </transformationSets> - </configuration> - </plugin> - <!-- copy xml javadoc to class jar --> - <plugin> - <artifactId>maven-resources-plugin</artifactId> - <version>2.6</version> - <executions> - <execution> - <id>copy-resources</id> - <phase>process-resources</phase> - <goals> - <goal>copy-resources</goal> - </goals> - <configuration> - <outputDirectory>${basedir}/target/classes</outputDirectory> - <resources> - <resource> - <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> - <includes> - <include>${project.artifactId}-${project.version}-javadoc.xml</include> - </includes> - <filtering>true</filtering> - </resource> - </resources> - </configuration> - </execution> - </executions> - </plugin> - - </plugins> - - </build> - <dependencies> <!-- add your dependencies here --> <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>malhar-library</artifactId> - <version>${malhar.version}</version> - </dependency> - <dependency> - <groupId>org.apache.apex</groupId> - <artifactId>apex-common</artifactId> - <version>${apex.version}</version> - <scope>provided</scope> - </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> @@ -254,7 +25,7 @@ <dependency> <groupId>org.apache.apex</groupId> <artifactId>apex-engine</artifactId> - <version>${apex.version}</version> + <version>${apex.core.version}</version> <scope>test</scope> </dependency> <dependency> @@ -262,6 +33,11 @@ <artifactId>janino</artifactId> <version>2.7.8</version> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.1</version> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/com/example/transform/Application.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/com/example/transform/Application.java b/examples/transform/src/main/java/com/example/transform/Application.java deleted file mode 100644 index d73b47d..0000000 --- a/examples/transform/src/main/java/com/example/transform/Application.java +++ /dev/null @@ -1,39 +0,0 @@ -package com.example.transform; - -import java.util.HashMap; -import java.util.Map; - -import org.apache.hadoop.conf.Configuration; - -import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; -import com.datatorrent.api.StreamingApplication; -import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.common.partitioner.StatelessPartitioner; -import com.datatorrent.lib.io.ConsoleOutputOperator; -import com.datatorrent.lib.transform.TransformOperator; - -@ApplicationAnnotation(name="TransformExample") -public class Application implements StreamingApplication -{ - @Override - public void populateDAG(DAG dag, Configuration conf) - { - POJOGenerator input = dag.addOperator("Input", new POJOGenerator()); - TransformOperator transform = dag.addOperator("Process", new TransformOperator()); - // Set expression map - Map<String, String> expMap = new HashMap<>(); - expMap.put("name", "{$.firstName}.concat(\" \").concat({$.lastName})"); - expMap.put("age", "(new java.util.Date()).getYear() - {$.dateOfBirth}.getYear()"); - expMap.put("address", "{$.address}.toLowerCase()"); - transform.setExpressionMap(expMap); - ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); - - dag.addStream("InputToTransform", input.output, transform.input); - dag.addStream("TransformToOutput", transform.output, output.input); - - dag.setInputPortAttribute(transform.input, Context.PortContext.TUPLE_CLASS, CustomerEvent.class); - dag.setOutputPortAttribute(transform.output, Context.PortContext.TUPLE_CLASS, CustomerInfo.class); - dag.setAttribute(transform, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<TransformOperator>(2)); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/com/example/transform/CustomerEvent.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/com/example/transform/CustomerEvent.java b/examples/transform/src/main/java/com/example/transform/CustomerEvent.java deleted file mode 100644 index 14a9c82..0000000 --- a/examples/transform/src/main/java/com/example/transform/CustomerEvent.java +++ /dev/null @@ -1,74 +0,0 @@ -package com.example.transform; - -import java.util.Date; - -public class CustomerEvent -{ - private int customerId; - private String firstName; - private String lastName; - private Date dateOfBirth; - private String address; - - public int getCustomerId() - { - return customerId; - } - - public void setCustomerId(int customerId) - { - this.customerId = customerId; - } - - public String getFirstName() - { - return firstName; - } - - public void setFirstName(String firstName) - { - this.firstName = firstName; - } - - public String getLastName() - { - return lastName; - } - - public void setLastName(String lastName) - { - this.lastName = lastName; - } - - public Date getDateOfBirth() - { - return dateOfBirth; - } - - public void setDateOfBirth(Date dateOfBirth) - { - this.dateOfBirth = dateOfBirth; - } - - public String getAddress() - { - return address; - } - - public void setAddress(String address) - { - this.address = address; - } - - @Override - public String toString() - { - return "CustomerEvent{" + - "customerId=" + customerId + - ", firstName='" + firstName + '\'' + - ", lastName='" + lastName + '\'' + - ", dateOfBirth=" + dateOfBirth + - ", address='" + address + '\'' + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9c154f20/examples/transform/src/main/java/com/example/transform/CustomerInfo.java ---------------------------------------------------------------------- diff --git a/examples/transform/src/main/java/com/example/transform/CustomerInfo.java b/examples/transform/src/main/java/com/example/transform/CustomerInfo.java deleted file mode 100644 index 28a9ccb..0000000 --- a/examples/transform/src/main/java/com/example/transform/CustomerInfo.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.example.transform; - -public class CustomerInfo -{ - private int customerId; - private String name; - private int age; - private String address; - - public int getCustomerId() - { - return customerId; - } - - public void setCustomerId(int customerId) - { - this.customerId = customerId; - } - - public String getName() - { - return name; - } - - public void setName(String name) - { - this.name = name; - } - - public int getAge() - { - return age; - } - - public void setAge(int age) - { - this.age = age; - } - - public String getAddress() - { - return address; - } - - public void setAddress(String address) - { - this.address = address; - } - - @Override - public String toString() - { - return "CustomerInfo{" + - "customerId=" + customerId + - ", name='" + name + '\'' + - ", age=" + age + - ", address='" + address + '\'' + - '}'; - } -}
