Repository: apex-malhar Updated Branches: refs/heads/master 2493bcbf5 -> 8e44a9cab
APEXMALHAR-2471 Upgrading APEXCORE dependency to version 3.6.0 Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8e44a9ca Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8e44a9ca Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8e44a9ca Branch: refs/heads/master Commit: 8e44a9cab014aa40e25d06ca324ef5fd70aec152 Parents: 2493bcb Author: ajaygit158 <[email protected]> Authored: Tue Apr 25 11:28:01 2017 +0530 Committer: ajaygit158 <[email protected]> Committed: Thu May 25 11:31:45 2017 +0530 ---------------------------------------------------------------------- .../benchmark/WordCountOperator.java | 2 +- .../HBaseTransactionalPutOperatorTest.java | 131 ++----------------- library/pom.xml | 31 +++++ .../datatorrent/lib/math/RunningAverage.java | 6 +- .../com/datatorrent/lib/stream/Counter.java | 2 +- .../lib/io/fs/FileSplitterBaseTest.java | 5 +- .../malhar/lib/dedup/DeduperOrderingTest.java | 6 +- pom.xml | 4 +- .../stream/api/impl/ApexStreamImplTest.java | 8 +- 9 files changed, 61 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java index 6e91482..8c55404 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/WordCountOperator.java @@ -44,7 +44,7 @@ public class WordCountOperator<T> implements Operator @Override public void process(T tuple) { - count++; + WordCountOperator.this.count++; } }; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java index 3cdc1bf..665cd40 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java @@ -19,19 +19,19 @@ package com.datatorrent.contrib.hbase; import java.io.IOException; -import java.util.Collection; -import org.apache.hadoop.hbase.client.Put; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.api.Attribute; +import org.apache.hadoop.hbase.client.Put; + import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Operator.ProcessingMode; +import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext; /** * */ @@ -55,44 +55,9 @@ public class HBaseTransactionalPutOperatorTest { t1.setColFamily("colfam0");t1.setColName("street");t1.setRow("row1");t1.setColValue("ts"); HBaseTuple t2=new HBaseTuple(); t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc"); - thop.setup(new OperatorContext() { - - @Override - public <T> T getValue(Attribute<T> key) { - if(key.equals(PROCESSING_MODE)){ - return (T) ProcessingMode.AT_LEAST_ONCE; - } - return key.defaultValue; - } - - @Override - public AttributeMap getAttributes() { - return null; - } - - @Override - public int getId() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void setCounters(Object counters) { - // TODO Auto-generated method stub - - } - - @Override - public void sendMetrics(Collection<String> collection) - { - } - - @Override - public int getWindowsFromCheckpoint() - { - return 0; - } - }); + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE); + thop.setup(mockOperatorContext(0, attributeMap)); thop.beginWindow(0); thop.input.process(t1); thop.input.process(t2); @@ -125,44 +90,9 @@ public class HBaseTransactionalPutOperatorTest { t1.setColFamily("colfam0");t1.setColName("street");t1.setRow("row1");t1.setColValue("ts"); HBaseTuple t2=new HBaseTuple(); t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc"); - thop.setup(new OperatorContext() { - - @Override - public <T> T getValue(Attribute<T> key) { - if(key.equals(PROCESSING_MODE)){ - return (T) ProcessingMode.AT_MOST_ONCE; - } - return key.defaultValue; - } - - @Override - public AttributeMap getAttributes() { - return null; - } - - @Override - public int getId() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void setCounters(Object counters) { - // TODO Auto-generated method stub - - } - - @Override - public void sendMetrics(Collection<String> collection) - { - } - - @Override - public int getWindowsFromCheckpoint() - { - return 0; - } - }); + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_MOST_ONCE); + thop.setup(mockOperatorContext(0, attributeMap)); thop.beginWindow(0); thop.input.process(t1); thop.input.process(t2); @@ -199,46 +129,9 @@ public class HBaseTransactionalPutOperatorTest { t2.setColFamily("colfam0");t2.setColName("city");t2.setRow("row2");t2.setColValue("tc"); thop.beginWindow(0); thop.input.process(t1); - thop.setup(new OperatorContext() { - - @Override - public <T> T getValue(Attribute<T> key) { - if(key.equals(PROCESSING_MODE)){ - return (T) ProcessingMode.AT_MOST_ONCE; - } - return key.defaultValue; - } - - @Override - public AttributeMap getAttributes() { - return null; - } - - @Override - public int getId() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public void setCounters(Object counters) { - // TODO Auto-generated method stub - - } - - @Override - public void sendMetrics(Collection<String> collection) - { - } - - @Override - public int getWindowsFromCheckpoint() - { - return 0; - } - }); - - + AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap(); + attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_MOST_ONCE); + thop.setup(mockOperatorContext(0, attributeMap)); thop.input.process(t2); thop.endWindow(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/pom.xml ---------------------------------------------------------------------- diff --git a/library/pom.xml b/library/pom.xml index 609d537..17908dd 100644 --- a/library/pom.xml +++ b/library/pom.xml @@ -366,6 +366,37 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + <type>test-jar</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + <version>3.1.0</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>1.7.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.datatorrent</groupId> + <artifactId>netlet</artifactId> + <version>1.3.1</version> + <scope>provided</scope> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java index 163f06b..e3b0edf 100644 --- a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java +++ b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java @@ -64,9 +64,9 @@ public class RunningAverage extends BaseOperator @Override public void process(Number tuple) { - double sum = (count * average) + tuple.doubleValue(); - count++; - average = sum / count; + double sum = (RunningAverage.this.count * average) + tuple.doubleValue(); + RunningAverage.this.count++; + average = sum / RunningAverage.this.count; } }; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/main/java/com/datatorrent/lib/stream/Counter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/Counter.java b/library/src/main/java/com/datatorrent/lib/stream/Counter.java index 8de2653..d227988 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/Counter.java +++ b/library/src/main/java/com/datatorrent/lib/stream/Counter.java @@ -50,7 +50,7 @@ public class Counter implements Operator, Unifier<Integer> @Override public void process(Object tuple) { - count++; + Counter.this.count++; } }; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java index c982ee4..9ec709e 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterBaseTest.java @@ -220,7 +220,6 @@ public class FileSplitterBaseTest @Override public void populateDAG(DAG dag, Configuration configuration) { - dag.setAttribute(DAG.APPLICATION_PATH, baseTestMeta.dataDirectory); MockFileInput fileInput = dag.addOperator("Input", new MockFileInput()); fileInput.filePaths = baseTestMeta.filePaths; @@ -246,8 +245,8 @@ public class FileSplitterBaseTest @Override public void process(FileSplitterInput.FileMetadata fileMetadata) { - count++; - LOG.debug("count {}", count); + MockReceiver.this.count++; + LOG.debug("count {}", MockReceiver.this.count); } }; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java index b4f76a6..157f505 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/dedup/DeduperOrderingTest.java @@ -130,7 +130,7 @@ public class DeduperOrderingTest if (pojo.getSequence() < prevSequence) { testFailed = true; } - count++; + Verifier.this.count++; prevSequence = pojo.sequence; } }; @@ -144,7 +144,7 @@ public class DeduperOrderingTest if (pojo.getSequence() < prevSequence) { testFailed = true; } - count++; + Verifier.this.count++; prevSequence = pojo.sequence; } }; @@ -158,7 +158,7 @@ public class DeduperOrderingTest if (pojo.getSequence() < prevSequence) { testFailed = true; } - count++; + Verifier.this.count++; prevSequence = pojo.sequence; } }; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index adc6de5..dc9ed8b 100644 --- a/pom.xml +++ b/pom.xml @@ -25,7 +25,7 @@ <parent> <groupId>org.apache.apex</groupId> <artifactId>apex</artifactId> - <version>3.4.0</version> + <version>3.6.0</version> </parent> <groupId>org.apache.apex</groupId> @@ -49,7 +49,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.deploy.skip>false</maven.deploy.skip> - <apex.core.version>3.4.0</apex.core.version> + <apex.core.version>3.6.0</apex.core.version> <semver.plugin.skip>false</semver.plugin.skip> <surefire.args>-Xmx2048m</surefire.args> </properties> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e44a9ca/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java ---------------------------------------------------------------------- diff --git a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java index 99d5ca6..5cc5d98 100644 --- a/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java +++ b/stream/src/test/java/org/apache/apex/malhar/stream/api/impl/ApexStreamImplTest.java @@ -32,6 +32,7 @@ import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta; import static org.apache.apex.malhar.stream.api.Option.Options.name; @@ -79,8 +80,11 @@ public class ApexStreamImplTest // Assert the stream is from first operator to second operator Assert.assertEquals("first", stream.getSource().getOperatorMeta().getName()); - Assert.assertTrue(1 == stream.getSinks().size()); - Assert.assertEquals("second", stream.getSinks().get(0).getOperatorWrapper().getName()); + Collection<InputPortMeta> portMetaCollection = stream.getSinks(); + Assert.assertTrue(1 == portMetaCollection.size()); + for (InputPortMeta inputPortMeta : portMetaCollection) { + Assert.assertEquals("second", inputPortMeta.getOperatorMeta().getName()); + } // Assert the stream is thread local Assert.assertTrue(stream.getLocality() == DAG.Locality.THREAD_LOCAL);
