Cover distributeddistinct in CI build.
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/9587b03e Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/9587b03e Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/9587b03e Branch: refs/heads/devel-3 Commit: 9587b03e26ab884c6877701dfdbd5778aa2a5a81 Parents: a6ba9a4 Author: Thomas Weise <[email protected]> Authored: Fri Oct 16 20:35:22 2015 -0700 Committer: Thomas Weise <[email protected]> Committed: Fri Oct 16 20:35:22 2015 -0700 ---------------------------------------------------------------------- demos/distributedistinct/pom.xml | 4 +-- .../demos/distributeddistinct/Application.java | 9 +++-- .../UniqueValueCountAppender.java | 17 ++------- .../DistributedDistinctTest.java | 4 +-- .../StatefulUniqueCountTest.java | 5 +-- demos/pom.xml | 36 ++++++++++++-------- 6 files changed, 34 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/pom.xml ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/pom.xml b/demos/distributedistinct/pom.xml index 5f608a7..dac02ae 100644 --- a/demos/distributedistinct/pom.xml +++ b/demos/distributedistinct/pom.xml @@ -22,8 +22,6 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> - <groupId>org.apache.apex</groupId> - <version>3.0.0</version> <artifactId>distributedistinct</artifactId> <packaging>jar</packaging> @@ -33,7 +31,7 @@ <parent> <groupId>org.apache.apex</groupId> <artifactId>malhar-demos</artifactId> - <version>3.0.0</version> + <version>3.2.0-incubating-SNAPSHOT</version> </parent> <properties> http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java index 7f65472..656b083 100644 --- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java +++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/Application.java @@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.datatorrent.api.annotation.ApplicationAnnotation; -import com.datatorrent.lib.algo.UniqueCounterValue; import com.datatorrent.lib.algo.UniqueValueCount; import com.datatorrent.lib.io.ConsoleOutputOperator; import com.datatorrent.lib.stream.Counter; @@ -61,10 +60,10 @@ public class Application implements StreamingApplication dag.addStream("Duplicates", valCount.output, dup.data); dag.addStream("Unverified", dup.out1, verifier.recIn); dag.addStream("EventCount", randGen.verport, verifier.trueIn); - dag.addStream("Verified", verifier.successPort, successcounter.data); - dag.addStream("Failed", verifier.failurePort, failurecounter.data); - dag.addStream("SuccessCount", successcounter.count, successOutput.input); - dag.addStream("FailedCount", failurecounter.count, failureOutput.input); + dag.addStream("Verified", verifier.successPort, successcounter.input); + dag.addStream("Failed", verifier.failurePort, failurecounter.input); + dag.addStream("SuccessCount", successcounter.output, successOutput.input); + dag.addStream("FailedCount", failurecounter.output, failureOutput.input); dag.addStream("Output", dup.out2, consOut.input); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java index e613e32..7c91f77 100644 --- a/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java +++ b/demos/distributedistinct/src/main/java/com/datatorrent/demos/distributeddistinct/UniqueValueCountAppender.java @@ -42,7 +42,6 @@ import com.datatorrent.lib.db.jdbc.JDBCLookupCacheBackedOperator; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultPartition; import com.datatorrent.api.Partitioner; - import com.datatorrent.netlet.util.DTThrowable; /** @@ -91,7 +90,7 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO public void setup(Context.OperatorContext context) { super.setup(context); - LOGGER.debug("store properties {} {}", store.getDbDriver(), store.getDbUrl()); + LOGGER.debug("store properties {} {}", store.getDatabaseDriver(), store.getDatabaseUrl()); LOGGER.debug("table name {}", tableName); windowID = context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID); try { @@ -197,19 +196,9 @@ public abstract class UniqueValueCountAppender<V> extends JDBCLookupCacheBackedO * rollback, each partition will only clear the data that it is responsible for. */ @Override - public Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> definePartitions(Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> partitions, int incrementalCapacity) + public Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> definePartitions(Collection<com.datatorrent.api.Partitioner.Partition<UniqueValueCountAppender<V>>> partitions, PartitioningContext context) { - final int finalCapacity; - - //In the case of parallel partitioning - if(incrementalCapacity != 0) { - finalCapacity = incrementalCapacity; - } - //Do normal partitioning - else { - finalCapacity = partitionCount; - } - + final int finalCapacity = DefaultPartition.getRequiredPartitionCount(context, this.partitionCount); UniqueValueCountAppender<V> anOldOperator = partitions.iterator().next().getPartitionedInstance(); partitions.clear(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java index f5acbd6..d32047a 100644 --- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java +++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/DistributedDistinctTest.java @@ -192,8 +192,8 @@ public class DistributedDistinctTest attributes.put(DAG.APPLICATION_PATH, applicationPath); attributes.put(OperatorContext.ACTIVATION_WINDOW_ID, 0L); valueCounter.setTableName(TABLE_NAME); - valueCounter.getStore().setDbDriver(INMEM_DB_DRIVER); - valueCounter.getStore().setDbUrl(INMEM_DB_URL); + valueCounter.getStore().setDatabaseDriver(INMEM_DB_DRIVER); + valueCounter.getStore().setDatabaseUrl(INMEM_DB_URL); TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributes); valueCounter.setup(context); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java ---------------------------------------------------------------------- diff --git a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java index 93ec636..55f1c8e 100644 --- a/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java +++ b/demos/distributedistinct/src/test/java/com/datatorrent/demos/distributeddistinct/StatefulUniqueCountTest.java @@ -30,6 +30,7 @@ import org.junit.Test; import com.datatorrent.api.*; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.demos.distributeddistinct.IntegerUniqueValueCountAppender; import com.datatorrent.lib.algo.UniqueValueCount; @@ -41,7 +42,7 @@ public class StatefulUniqueCountTest public static final String INMEM_DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true"; public static final String INMEM_DB_DRIVER = "org.hsqldb.jdbc.JDBCDriver"; public static final String TABLE_NAME = "Test_Lookup_Cache"; - + static class KeyGen implements InputOperator { @@ -193,7 +194,7 @@ public class StatefulUniqueCountTest dag.addStream("ResultsOut", uniqueOut, verifyTable.input); } } - + @BeforeClass public static void setup(){ try { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/9587b03e/demos/pom.xml ---------------------------------------------------------------------- diff --git a/demos/pom.xml b/demos/pom.xml index 123591f..97382ba 100644 --- a/demos/pom.xml +++ b/demos/pom.xml @@ -32,21 +32,6 @@ <packaging>pom</packaging> <name>Apache Apex Malhar Demos</name> - <modules> - <module>machinedata</module> - <module>pi</module> - <module>twitter</module> - <module>yahoofinance</module> - <module>frauddetect</module> - <module>mobile</module> - <module>wordcount</module> - <module>mrmonitor</module> - <module>mroperator</module> - <module>uniquecount</module> - <module>r</module> - <module>echoserver</module> - </modules> - <properties> <apex.core.version>3.2.0-incubating-SNAPSHOT</apex.core.version> <datatorrent.apppackage.classpath>lib/*.jar</datatorrent.apppackage.classpath> @@ -177,8 +162,29 @@ </plugins> </build> </profile> + <profile> + <id>all-modules</id> + <modules> + <module>distributedistinct</module> + </modules> + </profile> </profiles> + <modules> + <module>machinedata</module> + <module>pi</module> + <module>twitter</module> + <module>yahoofinance</module> + <module>frauddetect</module> + <module>mobile</module> + <module>wordcount</module> + <module>mrmonitor</module> + <module>mroperator</module> + <module>uniquecount</module> + <module>r</module> + <module>echoserver</module> + </modules> + <dependencies> <dependency> <groupId>org.apache.apex</groupId>
