Add example of custom partitioner and stream codec Update index and revise some README files
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f7c7b7cf Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f7c7b7cf Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f7c7b7cf Branch: refs/heads/master Commit: f7c7b7cf476e0fe7ee82ffd85a2ed14a82cbcaed Parents: 041af06 Author: Munagala V. Ramanath <[email protected]> Authored: Sun Mar 13 17:37:45 2016 -0700 Committer: Lakshmi Prasanna Velineni <[email protected]> Committed: Sun Mar 26 11:43:48 2017 -0700 ---------------------------------------------------------------------- examples/partition/README.md | 12 + .../partition/XmlJavadocCommentsExtractor.xsl | 44 +++ examples/partition/pom.xml | 274 +++++++++++++++++++ examples/partition/src/assemble/appPackage.xml | 43 +++ .../java/com/example/myapexapp/Application.java | 27 ++ .../main/java/com/example/myapexapp/Codec3.java | 13 + .../myapexapp/RandomNumberGenerator.java | 83 ++++++ .../com/example/myapexapp/TestPartition.java | 164 +++++++++++ .../src/main/resources/META-INF/properties.xml | 33 +++ .../src/main/resources/my-log4j.properties | 16 ++ .../com/example/myapexapp/ApplicationTest.java | 37 +++ .../src/test/resources/log4j.properties | 21 ++ 12 files changed, 767 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/README.md ---------------------------------------------------------------------- diff --git a/examples/partition/README.md b/examples/partition/README.md new file mode 100644 index 0000000..6a46599 --- /dev/null +++ b/examples/partition/README.md @@ -0,0 +1,12 @@ +This example shows how to define custom partitions and a custom `StreamCodec` to customize +the set of tuples that reach each partition. + +There are two operators: `RandomNumberGenerator` (generates random integers) and +`TestPartition` (logs input tuples). + +The application also uses a StreamCodec called `Codec3` to tag each tuple with a +partition tag based on whether the number is divisible by 2 or 4. + +`TestPartition` has code to create 3 partitions: one gets odd numbers, one gets multiples +of 4 and the last gets the rest. The `PartitionKeys` associated with each partition use +the partition tag to select the set of tuples to be handled by that partition. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/XmlJavadocCommentsExtractor.xsl ---------------------------------------------------------------------- diff --git a/examples/partition/XmlJavadocCommentsExtractor.xsl b/examples/partition/XmlJavadocCommentsExtractor.xsl new file mode 100644 index 0000000..08075a9 --- /dev/null +++ b/examples/partition/XmlJavadocCommentsExtractor.xsl @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> + +<!-- + Document : XmlJavadocCommentsExtractor.xsl + Created on : September 16, 2014, 11:30 AM + Description: + The transformation strips off all information except for comments and tags from xml javadoc generated by xml-doclet. +--> + +<xsl:stylesheet xmlns:xsl="http://www.w3.org/1999/XSL/Transform" version="1.0"> + <xsl:output method="xml" standalone="yes"/> + + <!-- copy xml by selecting only the following nodes, attributes and text --> + <xsl:template match="node()|text()|@*"> + <xsl:copy> + <xsl:apply-templates select="root|package|class|interface|method|field|type|comment|tag|text()|@name|@qualified|@text"/> + </xsl:copy> + </xsl:template> + + <!-- Strip off the following paths from the selected xml --> + <xsl:template match="//root/package/interface/interface + |//root/package/interface/method/@qualified + |//root/package/class/interface + |//root/package/class/class + |//root/package/class/method/@qualified + |//root/package/class/field/@qualified" /> + + <xsl:strip-space elements="*"/> +</xsl:stylesheet> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/pom.xml ---------------------------------------------------------------------- diff --git a/examples/partition/pom.xml b/examples/partition/pom.xml new file mode 100644 index 0000000..ac15981 --- /dev/null +++ b/examples/partition/pom.xml @@ -0,0 +1,274 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <groupId>com.example</groupId> + <version>1.0-SNAPSHOT</version> + <artifactId>Test-Ram</artifactId> + <packaging>jar</packaging> + + <!-- change these to the appropriate values --> + <name>Test_ram</name> + <description>Test_ram</description> + + <properties> + <!-- change this if you desire to use a different version of Apex core --> + <apex.version>3.5.0</apex.version> + <apex.apppackage.classpath>lib/*.jar</apex.apppackage.classpath> + </properties> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-eclipse-plugin</artifactId> + <version>2.9</version> + <configuration> + <downloadSources>true</downloadSources> + </configuration> + </plugin> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <version>3.3</version> + <configuration> + <encoding>UTF-8</encoding> + <source>1.7</source> + <target>1.7</target> + <debug>true</debug> + <optimize>false</optimize> + <showDeprecation>true</showDeprecation> + <showWarnings>true</showWarnings> + </configuration> + </plugin> + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>prepare-package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>target/deps</outputDirectory> + <includeScope>runtime</includeScope> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>app-package-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <finalName>${project.artifactId}-${project.version}-apexapp</finalName> + <appendAssemblyId>false</appendAssemblyId> + <descriptors> + <descriptor>src/assemble/appPackage.xml</descriptor> + </descriptors> + <archiverConfig> + <defaultDirectoryMode>0755</defaultDirectoryMode> + </archiverConfig> + <archive> + <manifestEntries> + <Class-Path>${apex.apppackage.classpath}</Class-Path> + <DT-Engine-Version>${apex.version}</DT-Engine-Version> + <DT-App-Package-Name>${project.artifactId}</DT-App-Package-Name> + <DT-App-Package-Version>${project.version}</DT-App-Package-Version> + <DT-App-Package-Display-Name>${project.name}</DT-App-Package-Display-Name> + <DT-App-Package-Description>${project.description}</DT-App-Package-Description> + </manifestEntries> + </archive> + </configuration> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <version>1.7</version> + <executions> + <execution> + <phase>package</phase> + <configuration> + <target> + <move file="${project.build.directory}/${project.artifactId}-${project.version}-apexapp.jar" + tofile="${project.build.directory}/${project.artifactId}-${project.version}.apa" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + <execution> + <!-- create resource directory for xml javadoc--> + <id>createJavadocDirectory</id> + <phase>generate-resources</phase> + <configuration> + <tasks> + <delete dir="${project.build.directory}/generated-resources/xml-javadoc"/> + <mkdir dir="${project.build.directory}/generated-resources/xml-javadoc"/> + </tasks> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.9.1</version> + <executions> + <execution> + <id>attach-artifacts</id> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>target/${project.artifactId}-${project.version}.apa</file> + <type>apa</type> + </artifact> + </artifacts> + <skipAttach>false</skipAttach> + </configuration> + </execution> + </executions> + </plugin> + + <!-- generate javdoc --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <!-- generate xml javadoc --> + <execution> + <id>xml-doclet</id> + <phase>generate-resources</phase> + <goals> + <goal>javadoc</goal> + </goals> + <configuration> + <doclet>com.github.markusbernhardt.xmldoclet.XmlDoclet</doclet> + <additionalparam>-d ${project.build.directory}/generated-resources/xml-javadoc -filename ${project.artifactId}-${project.version}-javadoc.xml</additionalparam> + <useStandardDocletOptions>false</useStandardDocletOptions> + <docletArtifact> + <groupId>com.github.markusbernhardt</groupId> + <artifactId>xml-doclet</artifactId> + <version>1.0.4</version> + </docletArtifact> + </configuration> + </execution> + </executions> + </plugin> + <!-- Transform xml javadoc to stripped down version containing only class/interface comments and tags--> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>xml-maven-plugin</artifactId> + <version>1.0</version> + <executions> + <execution> + <id>transform-xmljavadoc</id> + <phase>generate-resources</phase> + <goals> + <goal>transform</goal> + </goals> + </execution> + </executions> + <configuration> + <transformationSets> + <transformationSet> + <dir>${project.build.directory}/generated-resources/xml-javadoc</dir> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <stylesheet>XmlJavadocCommentsExtractor.xsl</stylesheet> + <outputDir>${project.build.directory}/generated-resources/xml-javadoc</outputDir> + </transformationSet> + </transformationSets> + </configuration> + </plugin> + <!-- copy xml javadoc to class jar --> + <plugin> + <artifactId>maven-resources-plugin</artifactId> + <version>2.6</version> + <executions> + <execution> + <id>copy-resources</id> + <phase>process-resources</phase> + <goals> + <goal>copy-resources</goal> + </goals> + <configuration> + <outputDirectory>${basedir}/target/classes</outputDirectory> + <resources> + <resource> + <directory>${project.build.directory}/generated-resources/xml-javadoc</directory> + <includes> + <include>${project.artifactId}-${project.version}-javadoc.xml</include> + </includes> + <filtering>true</filtering> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + + </build> + + <dependencies> + <!-- add your dependencies here --> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>malhar-library</artifactId> + <version>3.6.0</version> + + <!-- + If you know that your application does not need transitive dependencies pulled in by malhar-library, + uncomment the following to reduce the size of your app package. + --> + <!-- + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + --> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-common</artifactId> + <version>${apex.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.10</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.apex</groupId> + <artifactId>apex-engine</artifactId> + <version>${apex.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/assemble/appPackage.xml ---------------------------------------------------------------------- diff --git a/examples/partition/src/assemble/appPackage.xml b/examples/partition/src/assemble/appPackage.xml new file mode 100644 index 0000000..7ad071c --- /dev/null +++ b/examples/partition/src/assemble/appPackage.xml @@ -0,0 +1,43 @@ +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>appPackage</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <fileSets> + <fileSet> + <directory>${basedir}/target/</directory> + <outputDirectory>/app</outputDirectory> + <includes> + <include>${project.artifactId}-${project.version}.jar</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/target/deps</directory> + <outputDirectory>/lib</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/site/conf</directory> + <outputDirectory>/conf</outputDirectory> + <includes> + <include>*.xml</include> + </includes> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/META-INF</directory> + <outputDirectory>/META-INF</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/app</directory> + <outputDirectory>/app</outputDirectory> + </fileSet> + <fileSet> + <directory>${basedir}/src/main/resources/resources</directory> + <outputDirectory>/resources</outputDirectory> + </fileSet> + </fileSets> + +</assembly> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/java/com/example/myapexapp/Application.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/com/example/myapexapp/Application.java b/examples/partition/src/main/java/com/example/myapexapp/Application.java new file mode 100644 index 0000000..e1ca2ff --- /dev/null +++ b/examples/partition/src/main/java/com/example/myapexapp/Application.java @@ -0,0 +1,27 @@ +package com.example.myapexapp; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.annotation.ApplicationAnnotation; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.Context.PortContext; +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.lib.io.ConsoleOutputOperator; + +@ApplicationAnnotation(name="TestStuff") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + RandomNumberGenerator random = dag.addOperator("randomInt", RandomNumberGenerator.class); + TestPartition testPartition = dag.addOperator("testPartition", TestPartition.class); + Codec3 codec = new Codec3(); + dag.setInputPortAttribute(testPartition.in, PortContext.STREAM_CODEC, codec); + + //Add locality if needed, e.g.: .setLocality(Locality.CONTAINER_LOCAL); + dag.addStream("randomData", random.out, testPartition.in); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/java/com/example/myapexapp/Codec3.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/com/example/myapexapp/Codec3.java b/examples/partition/src/main/java/com/example/myapexapp/Codec3.java new file mode 100644 index 0000000..2754e9b --- /dev/null +++ b/examples/partition/src/main/java/com/example/myapexapp/Codec3.java @@ -0,0 +1,13 @@ +package com.example.myapexapp; + +import com.datatorrent.lib.codec.KryoSerializableStreamCodec; + +public class Codec3 extends KryoSerializableStreamCodec<Integer> { + @Override + public int getPartition(Integer tuple) { + final int v = tuple; + return (1 == (v & 1)) ? 0 // odd + : (0 == (v & 3)) ? 1 // divisible by 4 + : 2; // divisible by 2 but not 4 + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java new file mode 100644 index 0000000..de2797b --- /dev/null +++ b/examples/partition/src/main/java/com/example/myapexapp/RandomNumberGenerator.java @@ -0,0 +1,83 @@ +package com.example.myapexapp; + +import java.util.Random; + +import javax.validation.constraints.Min; +import javax.validation.constraints.Size; +import javax.validation.ConstraintViolation; +import javax.validation.ValidatorFactory; +import javax.validation.Validator; +import javax.validation.Validation; + +import com.datatorrent.api.Attribute; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; + +/** + * This is a simple operator that emits random integer. + */ +public class RandomNumberGenerator extends BaseOperator implements InputOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(RandomNumberGenerator.class); + + @Min(1) + private int numTuples = 20; + private transient int count = 0; + + private int sleepTime; + private transient long curWindowId; + private transient Random random = new Random(); + + public final transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>(); + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + long appWindowId = context.getValue(context.ACTIVATION_WINDOW_ID); + sleepTime = context.getValue(context.SPIN_MILLIS); + LOG.debug("Started setup, appWindowId = {}, sleepTime = {}", appWindowId, sleepTime); + } + + @Override + public void beginWindow(long windowId) + { + count = 0; + LOG.debug("beginWindow: windowId = {}", windowId); + } + + @Override + public void emitTuples() + { + if (count++ < numTuples) { + out.emit(random.nextInt()); + } else { + LOG.debug("count = {}, time = {}", count, System.currentTimeMillis()); + + try { + // avoid repeated calls to this function + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted"); + } + } + } + + public int getNumTuples() + { + return numTuples; + } + + public void setNumTuples(int numTuples) + { + this.numTuples = numTuples; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java b/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java new file mode 100644 index 0000000..1f77e72 --- /dev/null +++ b/examples/partition/src/main/java/com/example/myapexapp/TestPartition.java @@ -0,0 +1,164 @@ +package com.example.myapexapp; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Random; + +import javax.validation.constraints.Max; +import javax.validation.constraints.Min; +import javax.validation.constraints.Size; +import javax.validation.ConstraintViolation; +import javax.validation.ValidatorFactory; +import javax.validation.Validator; +import javax.validation.Validation; + +import com.datatorrent.api.Attribute; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.api.InputOperator; +import com.datatorrent.api.Partitioner; +import com.datatorrent.api.Partitioner.Partition; +import com.datatorrent.api.Partitioner.PartitionKeys; +import com.datatorrent.api.Partitioner.PartitioningContext; + +import com.datatorrent.common.util.BaseOperator; + +/** + * Simple operator to test partitioning + */ +public class TestPartition extends BaseOperator implements Partitioner<TestPartition> +{ + private static final Logger LOG = LoggerFactory.getLogger(TestPartition.class); + + private transient int id; // operator/partition id + private transient long curWindowId; // current window id + private transient long cnt; // per-window tuple count + + @Min(1) @Max(20) + private int nPartitions = 3; + + public final transient DefaultInputPort<Integer> in = new DefaultInputPort<Integer>() { + @Override + public void process(Integer tuple) + { + LOG.debug("{}: tuple = {}, operator id = {}", cnt, tuple, id); + ++cnt; + } + }; + + //public final transient DefaultOutputPort<Integer> out = new DefaultOutputPort<Integer>(); + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + long appWindowId = context.getValue(context.ACTIVATION_WINDOW_ID); + id = context.getId(); + LOG.debug("Started setup, appWindowId = {}, operator id = {}", appWindowId, id); + } + + @Override + public void beginWindow(long windowId) + { + cnt = 0; + curWindowId = windowId; + LOG.debug("window id = {}, operator id = {}", curWindowId, id); + } + + @Override + public void endWindow() + { + LOG.debug("window id = {}, operator id = {}, cnt = {}", curWindowId, id, cnt); + } + + @Override + public void partitioned(Map<Integer, Partition<TestPartition>> partitions) + { + //Do nothing + } + + @Override + public Collection<Partition<TestPartition>> definePartitions( + Collection<Partition<TestPartition>> partitions, + PartitioningContext context) + { + int oldSize = partitions.size(); + LOG.debug("partitionCount: current = {} requested = {}", oldSize, nPartitions); + + // each partition i in 0...nPartitions receives tuples divisible by i but not by any other + // j in that range; all other tuples ignored + // + if (3 != nPartitions) return getPartitions(partitions, context); + + // special case of 3 partitions: All odd numbers to partition 0; even numbers divisible + // by 4 to partition 1, those divisible by 2 but not 4 to partition 2. + + // mask used to extract discriminant from tuple hashcode + int mask = 0x03; + + Partition<TestPartition>[] newPartitions = new Partition[] { + new DefaultPartition<TestPartition>(new TestPartition()), + new DefaultPartition<TestPartition>(new TestPartition()), + new DefaultPartition<TestPartition>(new TestPartition()) }; + + HashSet<Integer>[] set + = new HashSet[] {new HashSet<>(), new HashSet<>(), new HashSet<>()}; + set[0].add(0); + set[1].add(1); + set[2].add(2); + + PartitionKeys[] keys = { + new PartitionKeys(mask, set[0]), + new PartitionKeys(mask, set[1]), + new PartitionKeys(mask, set[2]) }; + + for (int i = 0; i < 3; ++i ) { + Partition<TestPartition> partition = newPartitions[i]; + partition.getPartitionKeys().put(in, keys[i]); + } + + return new ArrayList<Partition<TestPartition>>(Arrays.asList(newPartitions)); + } // definePartitions + + private Collection<Partition<TestPartition>> getPartitions( + Collection<Partition<TestPartition>> partitions, + PartitioningContext context) + { + // create array of partitions to return + Collection<Partition<TestPartition>> result + = new ArrayList<Partition<TestPartition>>(nPartitions); + + int mask = getMask(nPartitions); + for (int i = 0; i < nPartitions; ++i) { + HashSet<Integer> set = new HashSet<>(); + set.add(i); + PartitionKeys keys = new PartitionKeys(mask, set); + Partition partition = new DefaultPartition<TestPartition>(new TestPartition()); + partition.getPartitionKeys().put(in, keys); + } + + return result; + } // getPartitions + + // return mask with bits 0..N set where N is the highest set bit of argument + private int getMask(final int n) { + return -1 >>> Integer.numberOfLeadingZeros(n); + } // getMask + + // accessors + public int getNPartitions() { return nPartitions; } + public void setNPartitions(int v) { nPartitions = v; } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/resources/META-INF/properties.xml ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/resources/META-INF/properties.xml b/examples/partition/src/main/resources/META-INF/properties.xml new file mode 100644 index 0000000..bf30603 --- /dev/null +++ b/examples/partition/src/main/resources/META-INF/properties.xml @@ -0,0 +1,33 @@ +<?xml version="1.0"?> +<configuration> + <!-- + <property> + <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name> + <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value> + </property> + --> + <!-- memory assigned to app master + <property> + <name>dt.attr.MASTER_MEMORY_MB</name> + <value>1024</value> + </property> + --> + + <!-- log4j configuration for all the operators --> + <property> + <name>dt.operator.*.attr.JVM_OPTIONS</name> + <value> -Dlog4j.configuration=my-log4j.properties</value> + </property> + + <property> + <name>dt.application.*.operator.randomInt.prop.numTuples</name> + <value>10</value> + </property> + <!-- + <property> + <name>dt.application.TestStuff.operator.testPartition.attr.PARTITIONER</name> + <value>com.datatorrent.common.partitioner.StatelessPartitioner:2</value> + </property> + --> +</configuration> + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/main/resources/my-log4j.properties ---------------------------------------------------------------------- diff --git a/examples/partition/src/main/resources/my-log4j.properties b/examples/partition/src/main/resources/my-log4j.properties new file mode 100644 index 0000000..21ead89 --- /dev/null +++ b/examples/partition/src/main/resources/my-log4j.properties @@ -0,0 +1,16 @@ +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} %M - %m%n + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +log4j.logger.org=INFO +log4j.logger.org.apache=INFO +log4j.logger.com.datatorrent=INFO + +#log4j.logger.com.example.myapexapp.TestPartition=DEBUG, CONSOLE http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java ---------------------------------------------------------------------- diff --git a/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java new file mode 100644 index 0000000..5f490d8 --- /dev/null +++ b/examples/partition/src/test/java/com/example/myapexapp/ApplicationTest.java @@ -0,0 +1,37 @@ +/** + * Put your copyright and license info here. + */ +package com.example.myapexapp; + +import java.io.IOException; + +import javax.validation.ConstraintViolationException; + +import org.junit.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import com.datatorrent.api.LocalMode; +import com.example.myapexapp.Application; + +/** + * Test the DAG declaration in local mode. + */ +public class ApplicationTest { + + @Test + public void testApplication() throws IOException, Exception { + try { + LocalMode lma = LocalMode.newInstance(); + Configuration conf = new Configuration(false); + conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + lma.prepareDAG(new Application(), conf); + LocalMode.Controller lc = lma.getController(); + lc.run(5000); // runs for 5 seconds and quits + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f7c7b7cf/examples/partition/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/examples/partition/src/test/resources/log4j.properties b/examples/partition/src/test/resources/log4j.properties new file mode 100644 index 0000000..3bfcdc5 --- /dev/null +++ b/examples/partition/src/test/resources/log4j.properties @@ -0,0 +1,21 @@ +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug
