http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java index a19417c..2bbb903 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java @@ -74,7 +74,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator transient Channel channel = null; transient String exchange = "testEx"; transient String queueName="testQ"; - + private WindowDataManager windowDataManager; private transient long currentWindowId; private transient long largestRecoveryWindowId; @@ -86,7 +86,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator @Override public void setup(OperatorContext context) { - // Needed to setup idempotency storage manager in setter + // Needed to setup idempotency storage manager in setter this.context = context; this.operatorContextId = context.getId(); @@ -104,11 +104,11 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator DTThrowable.rethrow(ex); } } - + @Override public void beginWindow(long windowId) { - currentWindowId = windowId; + currentWindowId = windowId; largestRecoveryWindowId = windowDataManager.getLargestCompletedWindow(); if (windowId <= largestRecoveryWindowId) { // Do not resend already sent tuples @@ -119,7 +119,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator skipProcessingTuple = false; } } - + /** * {@inheritDoc} */ @@ -158,11 +158,11 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator logger.debug(ex.toString()); } } - + public WindowDataManager getWindowDataManager() { return windowDataManager; } - + public void setWindowDataManager(WindowDataManager windowDataManager) { this.windowDataManager = windowDataManager; }
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java index 74ae181..1ddd9d4 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperator.java @@ -38,7 +38,7 @@ import com.datatorrent.netlet.util.DTThrowable; public class RabbitMQOutputOperator extends AbstractSinglePortRabbitMQOutputOperator<byte[]> { private static final Logger logger = LoggerFactory.getLogger(RabbitMQOutputOperator.class); - + @Override public void processTuple(byte[] tuple) { @@ -46,6 +46,6 @@ public class RabbitMQOutputOperator extends AbstractSinglePortRabbitMQOutputOper channel.basicPublish(exchange, "", null, tuple); } catch (IOException e) { DTThrowable.rethrow(e); - } + } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index 59b320d..0b12574 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -38,7 +38,7 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; /** * This is the base implementation of a Redis input operator. - * + * * @displayName Abstract Redis Input * @category Input * @tags redis, key value @@ -161,7 +161,7 @@ public abstract class AbstractRedisInputOperator<T> extends AbstractKeyValueStor scanComplete = false; scanParameters = new ScanParams(); scanParameters.count(scanCount); - + // For the 1st window after checkpoint, windowID - 1 would not have recovery // offset stored in windowDataManager // But recoveryOffset is non-transient, so will be recovered with http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java index de9ee45..ae8ef5c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisKeyValueInputOperator.java @@ -28,7 +28,7 @@ import com.datatorrent.lib.util.KeyValPair; * This is the an implementation of a Redis input operator for fetching * Key-Value pair stored in Redis. It takes in keys to fetch and emits * corresponding <Key, Value> Pair. Value data type is String in this case. - * + * * @displayName Redis Input Operator for Key Value pair * @category Store * @tags input operator, key value http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java index a9913f9..156252b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisMapAsValueInputOperator.java @@ -25,8 +25,8 @@ import com.datatorrent.lib.util.KeyValPair; /** * This is the an implementation of a Redis input operator It takes in keys to * fetch and emits Values stored as Maps in Redis i.e. when value datatype in - * Redis is HashMap - * + * Redis is HashMap + * * @displayName Redis Input Operator for Map * @category Store * @tags input operator, key value http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java index 27b4fd8..b540779 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/RedisStore.java @@ -192,7 +192,7 @@ public class RedisStore implements TransactionableKeyValueStore } /** - * Gets the stored Map for given the key, when the value data type is a map, stored with hmset + * Gets the stored Map for given the key, when the value data type is a map, stored with hmset * * @param key * @return hashmap stored for the key. @@ -329,8 +329,8 @@ public class RedisStore implements TransactionableKeyValueStore } } } - - + + /** * @return the timeOut http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java index 705e09c..805238c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/solr/AbstractSolrOutputOperator.java @@ -84,7 +84,7 @@ public abstract class AbstractSolrOutputOperator<T, S extends Connectable> exten /** * Converts the object into Solr document format - * + * * @param object to be stored to Solr Server * @return */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java b/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java index 3e86949..d9237c2 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java +++ b/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java @@ -110,7 +110,7 @@ public class ConcurrentUpdateSolrServerConnector extends SolrServerConnector } /* - * HttpClient instance + * HttpClient instance * Gets the HTTP Client instance */ public HttpClient getHttpClient() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java index ede1f3a..3abdb1b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java +++ b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkStore.java @@ -18,7 +18,7 @@ */ package com.datatorrent.contrib.splunk; -import com.splunk.*; +import com.splunk.*; import javax.validation.constraints.NotNull; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java index 15aaa0b..05d5e52 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/zmq/ZeroMQInputOperator.java @@ -31,7 +31,7 @@ package com.datatorrent.contrib.zmq; public class ZeroMQInputOperator extends AbstractSinglePortZeroMQInputOperator<byte[]> { @Override - public byte[] getTuple(byte[] message) { + public byte[] getTuple(byte[] message) { return message; } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java index 146a65d..51d90ab 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Change.java @@ -25,7 +25,7 @@ import com.datatorrent.lib.util.BaseNumberValueOperator; /** * Operator compares data values arriving on input port with base value input operator. - * + * * <p> * Arriving base value is stored in operator for comparison, old base value is overwritten. * This emits <change in value,percentage change>. @@ -80,7 +80,7 @@ public class Change<V extends Number> extends BaseNumberValueOperator<V> } } }; - + /** * Input port that takes a number It stores the value for base comparison. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java index 155cb23..bfa3c0a 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMap.java @@ -31,7 +31,7 @@ import com.datatorrent.lib.util.UnifierHashMap; * Operator compares based on the property "key", "value", and "compare". * <p> * The comparison is done by getting double value from the Number. - * Passed tuples are emitted on the output port "compare". + * Passed tuples are emitted on the output port "compare". * Failed tuples are emitted on port "except". * Both output ports are optional, but at least one has to be connected. * This module is a pass through<br> @@ -91,7 +91,7 @@ public class CompareExceptMap<K, V extends Number> extends MatchMap<K, V> */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<HashMap<K, V>> compare = match; - + /** * Output port that emits a hashmap of non matching tuples after comparison. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java index 3dcae74..2dcb583 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/ExceptMap.java @@ -66,7 +66,7 @@ import com.datatorrent.lib.util.UnifierHashMap; @Deprecated @Stateless public class ExceptMap<K, V extends Number> extends MatchMap<K, V> -{ +{ /** * Output port that emits non matching number tuples. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java index e1deb9d..8909acd 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/Quotient.java @@ -24,7 +24,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.lib.util.BaseNumberValueOperator; /** - * This operator adds all the values on "numerator" and "denominator" and emits quotient at end of window. + * This operator adds all the values on "numerator" and "denominator" and emits quotient at end of window. * <p> * <br> * <b>StateFull : Yes </b>, Sum of values is taken over application window. <br> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java index 3581b81..b37bbd5 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/QuotientMap.java @@ -31,7 +31,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.lib.util.BaseNumberKeyValueOperator; /** - * Add all the values for each key on "numerator" and "denominator" and emits quotient at end of window for all keys in the denominator. + * Add all the values for each key on "numerator" and "denominator" and emits quotient at end of window for all keys in the denominator. * <p> * <br> * Application can set multiplication value for quotient(default = 1). <br> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java index 048eff7..b2493a1 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/math/SumCountMap.java @@ -178,7 +178,7 @@ public class SumCountMap<K, V extends Number> extends return ret; } }; - + /** * Key,short sum output port. */ @@ -194,7 +194,7 @@ public class SumCountMap<K, V extends Number> extends return ret; } }; - + /** * Key,float sum output port. */ @@ -210,7 +210,7 @@ public class SumCountMap<K, V extends Number> extends return ret; } }; - + /** * Key,integer sum output port. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java index 38a4804..1f8dc5c 100644 --- a/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java +++ b/contrib/src/main/java/org/apache/apex/malhar/contrib/parser/StreamingJsonParser.java @@ -61,7 +61,7 @@ import com.datatorrent.lib.util.PojoUtils; * <b>err</b>:tuples that could not be parsed are emitted on this port as * KeyValPair<String,String><br> * Key being the tuple and Val being the reason - * + * * @displayName SimpleStreamingJsonParser * @category Parsers * @tags json pojo parser streaming @@ -187,7 +187,7 @@ public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, Strin /** * Creates a map representing fieldName in POJO:field in JSON:Data type - * + * * @return List of FieldInfo */ private List<FieldInfo> createFieldInfoMap(String str) @@ -255,7 +255,7 @@ public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, Strin /** * Use reflection to generate field info values if the user has not provided * the inputs mapping - * + * * @return String representing the POJO field to JSON field mapping */ private String generateFieldInfoInputs(Class<?> cls) @@ -331,7 +331,7 @@ public class StreamingJsonParser extends Parser<byte[], KeyValPair<String, Strin /** * Returns a POJO from a Generic Record Null is set as the default value if a * key is not found in the parsed JSON - * + * * @return Object */ @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java b/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java index 9c99ad2..f57279d 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/couchbase/CouchBaseSetTest.java @@ -90,7 +90,7 @@ public class CouchBaseSetTest System.err.println("Error connecting to Couchbase: " + e.getMessage()); System.exit(1); } - + TestPojo obj = new TestPojo(); obj.setName("test"); obj.setPhone(123344555); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java index 161fe90..671d7dc 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchOperatorTest.java @@ -89,7 +89,7 @@ public class ElasticSearchOperatorTest ElasticSearchMapOutputOperator<Map<String, Object>> operator = new ElasticSearchMapOutputOperator<Map<String, Object>>() { /* * (non-Javadoc) - * + * * @see com.datatorrent.contrib.elasticsearch. AbstractElasticSearchOutputOperator #processTuple(java.lang.Object) */ @Override @@ -128,7 +128,7 @@ public class ElasticSearchOperatorTest /** * Read data written to elastic search - * + * * @param tupleIDs * @param testStartTime */ @@ -137,7 +137,7 @@ public class ElasticSearchOperatorTest ElasticSearchMapInputOperator<Map<String, Object>> operator = new ElasticSearchMapInputOperator<Map<String, Object>>() { /** * Set SearchRequestBuilder parameters specific to current window. - * + * * @see com.datatorrent.contrib.elasticsearch.ElasticSearchMapInputOperator#getSearchRequestBuilder() */ @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java b/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java index f707f1b..daf1602 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolateTest.java @@ -77,9 +77,9 @@ public class ElasticSearchPercolateTest /** * Register percolate queries on ElasticSearch - * + * * @throws IOException - * + * */ private void registerPercolateQueries() throws IOException { @@ -89,7 +89,7 @@ public class ElasticSearchPercolateTest } /** - * + * */ private void checkPercolateResponse() { @@ -136,7 +136,7 @@ public class ElasticSearchPercolateTest matchIds.add(match.getId().toString()); } Collections.sort(matchIds); - + Assert.assertArrayEquals(matchIds.toArray(), matches[i]); i++; } @@ -157,6 +157,6 @@ public class ElasticSearchPercolateTest //This indicates that elasticsearch is not running on a particular machine. //Silently ignore in this case. } - + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java index 5c59622..5bde40c 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/geode/GeodeCheckpointStoreTest.java @@ -59,7 +59,7 @@ public class GeodeCheckpointStoreTest store.setTableName(REGION_NAME); store.connect(); } - + @Test public void testSave() throws IOException { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java index 4e6bb39..6a2f891 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOInputOperatorTest.java @@ -54,7 +54,7 @@ public class HBasePOJOInputOperatorTest HBASEINPUT, OUTPUT }; - + public static class MyGenerator extends TupleGenerateCacheOperator<TestPOJO> { public MyGenerator() @@ -84,7 +84,7 @@ public class HBasePOJOInputOperatorTest private HBaseStore store; private HBasePOJOPutOperator hbaseOutputOperator; private TestHBasePOJOInputOperator hbaseInputOperator; - + @Before public void prepare() throws Exception { @@ -93,13 +93,13 @@ public class HBasePOJOInputOperatorTest setupOperators(); HBaseUtil.createTable( store.getConfiguration(), store.getTableName()); } - + @After public void cleanup() throws Exception { HBaseUtil.deleteTable( store.getConfiguration(), store.getTableName()); } - + @Test public void test() throws Exception @@ -119,20 +119,20 @@ public class HBasePOJOInputOperatorTest // Create ActiveMQStringSinglePortOutputOperator MyGenerator generator = dag.addOperator( OPERATOR.GENERATOR.name(), MyGenerator.class); generator.setTupleNum( TUPLE_NUM ); - + hbaseOutputOperator = dag.addOperator( OPERATOR.HBASEOUTPUT.name(), hbaseOutputOperator ); hbaseInputOperator = dag.addOperator(OPERATOR.HBASEINPUT.name(), hbaseInputOperator); dag.setOutputPortAttribute(hbaseInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, TestPOJO.class); - - + + TupleCacheOutputOperator output = dag.addOperator(OPERATOR.OUTPUT.name(), TupleCacheOutputOperator.class); - + // Connect ports dag.addStream("queue1", generator.outputPort, hbaseOutputOperator.input ).setLocality(DAG.Locality.NODE_LOCAL); dag.addStream("queue2", hbaseInputOperator.outputPort, output.inputPort ).setLocality(DAG.Locality.NODE_LOCAL); - - + + Configuration conf = new Configuration(false); lma.prepareDAG(app, conf); @@ -158,10 +158,10 @@ public class HBasePOJOInputOperatorTest throw new RuntimeException("Testcase taking too long"); } } - + lc.shutdown(); - + validate( generator.getTuples(), output.getReceivedTuples() ); } @@ -173,11 +173,11 @@ public class HBasePOJOInputOperatorTest actual.removeAll(expected); Assert.assertTrue( "content not same.", actual.isEmpty() ); } - + protected void setupOperators() { TableInfo<HBaseFieldInfo> tableInfo = new TableInfo<HBaseFieldInfo>(); - + tableInfo.setRowOrIdExpression("row"); List<HBaseFieldInfo> fieldsInfo = new ArrayList<HBaseFieldInfo>(); @@ -186,10 +186,10 @@ public class HBasePOJOInputOperatorTest fieldsInfo.add( new HBaseFieldInfo( "address", "address", SupportType.STRING, "f1") ); tableInfo.setFieldsInfo(fieldsInfo); - + hbaseInputOperator.setTableInfo(tableInfo); hbaseOutputOperator.setTableInfo(tableInfo); - + store = new HBaseStore(); store.setTableName("test"); store.setZookeeperQuorum("localhost"); @@ -197,7 +197,7 @@ public class HBasePOJOInputOperatorTest hbaseInputOperator.setStore(store); hbaseOutputOperator.setStore(store); - + OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( OPERATOR_ID, new AttributeMap.DefaultAttributeMap()); hbaseInputOperator.setup(context); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java index 51dbadc..8c81560 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBasePOJOPutOperatorTest.java @@ -52,11 +52,11 @@ public class HBasePOJOPutOperatorTest private static final Logger logger = LoggerFactory.getLogger(HBasePOJOPutOperatorTest.class); public static final int TEST_SIZE = 15000; public static final int WINDOW_SIZE = 1500; - + private HBasePOJOPutOperator operator; - + private final long startWindowId = Calendar.getInstance().getTimeInMillis(); - + public HBasePOJOPutOperatorTest() { } @@ -69,13 +69,13 @@ public class HBasePOJOPutOperatorTest createOrDeleteTable(operator.getStore(), false ); } - + @After public void cleanup() throws Exception { createOrDeleteTable(operator.getStore(), true ); } - + /** * this test case only test if HBasePojoPutOperator can save data to the * HBase. it doesn't test connection to the other operators @@ -107,7 +107,7 @@ public class HBasePOJOPutOperatorTest Thread.sleep(30000); - + } catch (Exception e) { @@ -115,7 +115,7 @@ public class HBasePOJOPutOperatorTest Assert.fail(e.getMessage()); } } - + protected void createOrDeleteTable(HBaseStore store, boolean isDelete ) throws Exception { HBaseAdmin admin = null; @@ -123,7 +123,7 @@ public class HBasePOJOPutOperatorTest { admin = new HBaseAdmin(store.getConfiguration()); final String tableName = store.getTableName(); - + if (!admin.isTableAvailable(tableName) && !isDelete ) { HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); @@ -170,14 +170,14 @@ public class HBasePOJOPutOperatorTest OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext( OPERATOR_ID, attributeMap); - + operator.setup(context); } protected void configure(HBasePOJOPutOperator operator) { TableInfo<HBaseFieldInfo> tableInfo = new TableInfo<HBaseFieldInfo>(); - + tableInfo.setRowOrIdExpression("row"); List<HBaseFieldInfo> fieldsInfo = new ArrayList<HBaseFieldInfo>(); @@ -203,7 +203,7 @@ public class HBasePOJOPutOperatorTest { if( tupleGenerator == null ) tupleGenerator = new TupleGenerator<TestPOJO>( TestPOJO.class ); - + return tupleGenerator.getNextTuple(); } @@ -225,21 +225,21 @@ public class HBasePOJOPutOperatorTest HTable table = operator.getStore().getTable(); Scan scan = new Scan(); ResultScanner resultScanner = table.getScanner(scan); - + int recordCount = 0; while( true ) { Result result = resultScanner.next(); if( result == null ) break; - + int rowId = Integer.valueOf( Bytes.toString( result.getRow() ) ); Assert.assertTrue( "rowId="+rowId+" aut of range" , ( rowId > 0 && rowId <= TEST_SIZE ) ); Assert.assertTrue( "the rowId="+rowId+" already processed.", rowIds[rowId-1] == 1 ); rowIds[rowId-1]=0; - + List<Cell> cells = result.listCells(); - + Map<String, byte[]> map = new HashMap<String,byte[]>(); for( Cell cell : cells ) { @@ -250,17 +250,17 @@ public class HBasePOJOPutOperatorTest TestPOJO read = TestPOJO.from(map); read.setRowId((long)rowId); TestPOJO expected = new TestPOJO( rowId ); - + Assert.assertTrue( String.format( "expected %s, get %s ", expected.toString(), read.toString() ), expected.completeEquals(read) ); recordCount++; } - + int missedCount = 0; if( recordCount != TEST_SIZE ) { logger.error( "unsaved records: " ); StringBuilder sb = new StringBuilder(); - + for( int i=0; i<TEST_SIZE; ++i ) { if( rowIds[i] != 0 ) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java index eef69d4..3cdc1bf 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseTransactionalPutOperatorTest.java @@ -66,7 +66,7 @@ public class HBaseTransactionalPutOperatorTest { } @Override - public AttributeMap getAttributes() { + public AttributeMap getAttributes() { return null; } @@ -136,7 +136,7 @@ public class HBaseTransactionalPutOperatorTest { } @Override - public AttributeMap getAttributes() { + public AttributeMap getAttributes() { return null; } @@ -210,7 +210,7 @@ public class HBaseTransactionalPutOperatorTest { } @Override - public AttributeMap getAttributes() { + public AttributeMap getAttributes() { return null; } @@ -238,8 +238,8 @@ public class HBaseTransactionalPutOperatorTest { } }); - - + + thop.input.process(t2); thop.endWindow(); HBaseTuple tuple,tuple2; @@ -257,7 +257,7 @@ public class HBaseTransactionalPutOperatorTest { logger.error(e.getMessage()); } } - + public static class TestHBasePutOperator extends AbstractHBaseWindowPutOutputOperator<HBaseTuple> { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java index 9c237d7..5b54f3e 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java +++ b/contrib/src/test/java/com/datatorrent/contrib/hbase/HBaseUtil.java @@ -29,13 +29,13 @@ import org.apache.hadoop.hbase.client.HBaseAdmin; public class HBaseUtil { - public static void createTable(Configuration configuration, String tableName ) throws MasterNotRunningException, ZooKeeperConnectionException, IOException + public static void createTable(Configuration configuration, String tableName ) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { HBaseAdmin admin = null; try { admin = new HBaseAdmin( configuration ); - + if (!admin.isTableAvailable(tableName) ) { HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); @@ -54,14 +54,14 @@ public class HBaseUtil } } } - + public static void deleteTable( Configuration configuration, String tableName ) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { HBaseAdmin admin = null; try { admin = new HBaseAdmin( configuration ); - + if ( admin.isTableAvailable(tableName) ) { admin.disableTable(tableName); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java b/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java index e2eec7b..5465c28 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java +++ b/contrib/src/test/java/com/datatorrent/contrib/helper/MessageQueueTestHelper.java @@ -47,7 +47,7 @@ public class MessageQueueTestHelper { } } - public static ArrayList<HashMap<String, Integer>> getMessages() + public static ArrayList<HashMap<String, Integer>> getMessages() { ArrayList<HashMap<String, Integer>> mapList = new ArrayList<HashMap<String, Integer>>(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java index e20a9fe..5f32fb0 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaExactlyOnceOutputOperatorTest.java @@ -45,7 +45,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase private static final int maxTuple = 40; private static CountDownLatch latch; private static boolean isRestarted = false; - + /** * Tuple generator for testing. */ @@ -68,7 +68,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase @Override public void setup(OperatorContext context) { - + } @Override @@ -110,7 +110,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase for (int i = stringBuffer.size(); i-- > 0;) { if (i == 20 && isRestarted == false) { isRestarted = true; - // fail the operator and when it gets back resend everything + // fail the operator and when it gets back resend everything throw new RuntimeException(); } outputPort.emit(stringBuffer.poll()); @@ -144,7 +144,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase StringGeneratorInputOperator generator = dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class); final SimpleKafkaExactOnceOutputOperator node = dag.addOperator("Kafka message producer", SimpleKafkaExactOnceOutputOperator.class); - + Properties props = new Properties(); props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", "localhost:9092"); @@ -152,7 +152,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase props.setProperty("queue.buffering.max.ms", "200"); props.setProperty("queue.buffering.max.messages", "10"); props.setProperty("batch.num.messages", "5"); - + node.setConfigProperties(props); // Set configuration parameters for Kafka node.setTopic("topic1"); @@ -160,14 +160,14 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase // Connect ports dag.addStream("Kafka message", generator.outputPort, node.inputPort).setLocality(Locality.CONTAINER_LOCAL); - + // Create local cluster final LocalMode.Controller lc = lma.getController(); lc.runAsync(); Future f = Executors.newFixedThreadPool(1).submit(listener); f.get(30, TimeUnit.SECONDS); - + lc.shutdown(); // Check values send vs received @@ -176,9 +176,9 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase Assert.assertEquals("First tuple", "testString 1", listener.getMessage(listener.holdingBuffer.peek())); listener.close(); - + } - + public static class SimpleKafkaExactOnceOutputOperator extends AbstractExactlyOnceKafkaOutputOperator<String, String, String>{ @Override @@ -192,7 +192,7 @@ public class KafkaExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase { return new Pair<String, String>(tuple.split("###")[0], tuple.split("###")[1]); } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java index 0e3e4e5..e409353 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestPartitioner.java @@ -31,7 +31,7 @@ import kafka.utils.VerifiableProperties; public class KafkaTestPartitioner implements Partitioner { public KafkaTestPartitioner (VerifiableProperties props) { - + } @Override public int partition(Object key, int num_Partitions) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java index 0a72a2e..cbd946a 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java @@ -40,7 +40,7 @@ public class KafkaTestProducer implements Runnable private boolean hasPartition = false; private boolean hasMultiCluster = false; private List<String> messages; - + private String producerType = "async"; public int getSendCount() @@ -95,7 +95,7 @@ public class KafkaTestProducer implements Runnable producer1 = null; } } - + public KafkaTestProducer(String topic, boolean hasPartition) { this(topic, hasPartition, false); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java index f3f8478..c9948ba 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOperatorTestBase.java @@ -44,7 +44,7 @@ public class KinesisOperatorTestBase protected transient AWSCredentialsProvider credentials = null; private static final Logger logger = LoggerFactory.getLogger(KinesisOperatorTestBase.class); - + private void createClient() { credentials = new DefaultAWSCredentialsProviderChain(); @@ -56,27 +56,27 @@ public class KinesisOperatorTestBase { CreateStreamRequest streamRequest = null; createClient(); - + for( int i=0; i<100; ++i ) { - try + try { streamName = streamNamePrefix + i; streamRequest = new CreateStreamRequest(); streamRequest.setStreamName(streamName); streamRequest.setShardCount(shardCount); client.createStream(streamRequest); - + logger.info( "created stream {}.", streamName ); Thread.sleep(30000); - + break; } catch( ResourceInUseException riue ) { logger.warn( "Resource is in use.", riue.getMessage() ); } - catch (Exception e) + catch (Exception e) { logger.error( "Got exception.", e ); throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java index 368a191..b478b9f 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisOutputOperatorTest.java @@ -52,7 +52,7 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput super.beforeTest(); } - + /** * Test AbstractKinesisOutputOperator (i.e. an output adapter for Kinesis, aka producer). * This module sends data into an ActiveMQ message bus. @@ -94,7 +94,7 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput // Create ActiveMQStringSinglePortOutputOperator G generator = addGenerateOperator( dag ); - + O node = addTestingOperator( dag ); configureTestingOperator( node ); @@ -119,13 +119,13 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput } catch( Exception e ){} } - + if( listener != null ) listener.setIsAlive(false); - + if( listenerThread != null ) listenerThread.join( 1000 ); - + lc.shutdown(); // Check values send vs received @@ -141,10 +141,10 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput protected KinesisTestConsumer createConsumerListener( String streamName ) { KinesisTestConsumer listener = new KinesisTestConsumer(streamName); - + return listener; } - + protected void configureTestingOperator( O node ) { node.setAccessKey(credentials.getCredentials().getAWSAccessKeyId()); @@ -152,7 +152,7 @@ public abstract class KinesisOutputOperatorTest< O extends AbstractKinesisOutput node.setBatchSize(500); node.setStreamName(streamName); } - + protected abstract G addGenerateOperator( DAG dag ); protected abstract DefaultOutputPort getOutputPortOfGenerator( G generator ); protected abstract O addTestingOperator( DAG dag ); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java index 33f3179..f0a9eb7 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisStringOutputOperatorTest.java @@ -34,13 +34,13 @@ public class KinesisStringOutputOperatorTest extends KinesisOutputOperatorTest< return dag.addOperator("TestStringGenerator", StringGeneratorInputOperator.class); //StringGeneratorInputOperator generator = } - + @Override protected DefaultOutputPort getOutputPortOfGenerator( StringGeneratorInputOperator generator ) { return generator.outputPort; } - + @Override protected KinesisStringOutputOperator addTestingOperator(DAG dag) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java index 448ce72..a1547c1 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisTestConsumer.java @@ -48,11 +48,11 @@ public class KinesisTestConsumer implements Runnable private volatile boolean isAlive = true; private int receiveCount = 0; - + private CountDownLatch doneLatch; - + protected static final int MAX_TRY_TIMES = 30; - + private void createClient() { AWSCredentialsProvider credentials = new DefaultAWSCredentialsProviderChain(); @@ -87,16 +87,16 @@ public class KinesisTestConsumer implements Runnable buffer.get(bytes); return new String(bytes); } - + @Override public void run() { String iterator = prepareIterator(); - - while (isAlive ) + + while (isAlive ) { iterator = processNextIterator(iterator); - + //sleep at least 1 second to avoid exceeding the limit on getRecords frequency try { @@ -167,7 +167,7 @@ public class KinesisTestConsumer implements Runnable return; receiveCount += records.size(); logger.debug("ReceiveCount= {}", receiveCount); - + for( Record record : records ) { holdingBuffer.add(record); @@ -175,18 +175,18 @@ public class KinesisTestConsumer implements Runnable { processRecord( record ); } - + if( doneLatch != null ) doneLatch.countDown(); } - + } - + protected void processRecord( Record record ) { - + } - + public void close() { isAlive = false; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java index 60db81c..2ae525c 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/memcache/MemcachePOJOOperatorTest.java @@ -34,16 +34,16 @@ import net.spy.memcached.AddrUtil; public class MemcachePOJOOperatorTest { public static final int TUPLE_SIZE = 1000; - + private MemcacheStore store; - + @Before public void setup() { store = new MemcacheStore(); store.setServerAddresses(AddrUtil.getAddresses("localhost:11211") ); } - + public void cleanup() { if( store != null ) @@ -57,9 +57,9 @@ public class MemcachePOJOOperatorTest DTThrowable.rethrow(e); } } - + } - + @SuppressWarnings("unchecked") @Test public void testMemcacheOutputOperatorInternal() throws Exception @@ -74,21 +74,21 @@ public class MemcachePOJOOperatorTest operator.setTableInfo( tableInfo ); operator.setup(null); - + TupleGenerator<TestPOJO> generator = new TupleGenerator<TestPOJO>( TestPOJO.class ); - + for( int i=0; i<TUPLE_SIZE; ++i ) { operator.processTuple( generator.getNextTuple() ); } - + readDataAndVerify( operator.getStore(), generator ); } - + public void readDataAndVerify( MemcacheStore store, TupleGenerator<TestPOJO> generator ) { generator.reset(); - + for( int i=0; i<TUPLE_SIZE; ++i ) { TestPOJO expected = generator.getNextTuple(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java index aaa1e52..54c8d93 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java @@ -51,7 +51,7 @@ public class AbstractMemsqlInputOperatorTest public static final int NUM_WINDOWS = 10; public static final int DATABASE_SIZE = NUM_WINDOWS * BLAST_SIZE; public static final int OPERATOR_ID = 0; - + public static void populateDatabase(MemsqlStore memsqlStore) { memsqlStore.connect(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java index 6122477..a128181 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java +++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorBenchmark.java @@ -32,6 +32,6 @@ public class RabbitMQOutputOperatorBenchmark extends RabbitMQOutputOperatorTest public void testDag() throws Exception { runTest(100000); - logger.debug("end of test"); + logger.debug("end of test"); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java index 010c534..6dcdfbe 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java @@ -136,7 +136,7 @@ public class RedisInputOperatorTest RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator(); operator.setWindowDataManager(new FSWindowDataManager()); - + operator.setStore(operatorStore); operator.setScanCount(1); Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java index 654899a..32a4f39 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkInputOperatorTest.java @@ -26,7 +26,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.CollectorTestSink; /** - * + * * Unit test for splunk input operator. The test, queries splunk server for 100 rows and checks * how many rows are returned. * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java index 241f7e1..f1d9285 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperatorTest.java @@ -25,7 +25,7 @@ import com.datatorrent.lib.testbench.CollectorTestSink; import com.google.common.collect.Lists; /** - * + * * Unit test for splunk tcp output operator. The test sends 10 values to the splunk server and then * queries it for last 10 rows to check if the values are same or not. * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java index 2975c9c..7d34d71 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java +++ b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java @@ -35,18 +35,18 @@ import com.datatorrent.lib.util.PojoUtils.Setter; public class FieldValueSerializableGenerator< T extends FieldInfo> extends FieldValueGenerator<T> { - + public static < T extends FieldInfo > FieldValueSerializableGenerator<T> getFieldValueGenerator(final Class<?> clazz, List<T> fieldInfos) { return new FieldValueSerializableGenerator(clazz, fieldInfos); } - - + + private static final Logger logger = LoggerFactory.getLogger( FieldValueGenerator.class ); //it's better to same kryo instance for both de/serialize private Kryo _kryo = null; private Class<?> clazz; - + private FieldValueSerializableGenerator(){} public FieldValueSerializableGenerator(Class<?> clazz, List<T> fieldInfos) @@ -58,7 +58,7 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field /** * get the object which is serialized. * this method will convert the object into a map from column name to column value and then serialize it - * + * * @param obj * @return */ @@ -66,7 +66,7 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field { //if don't have field information, just convert the whole object to byte[] Object convertObj = obj; - + //if fields are specified, convert to map and then convert map to byte[] if( fieldGetterMap != null && !fieldGetterMap.isEmpty() ) { @@ -82,15 +82,15 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field return os.toByteArray(); } - + public Object deserializeObject( byte[] bytes ) { Object obj = getKryo().readClassAndObject( new Input( bytes ) ); - + if( fieldGetterMap == null || fieldGetterMap.isEmpty() ) return obj; - + // the obj in fact is a map, convert from map to object try { @@ -114,7 +114,7 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field return obj; } } - + protected Kryo getKryo() { if( _kryo == null ) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java b/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java index e2fc5cb..d9f5079 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java +++ b/contrib/src/test/java/com/datatorrent/contrib/util/POJOTupleGenerateOperator.java @@ -32,7 +32,7 @@ public class POJOTupleGenerateOperator<T> implements InputOperator, ActivationLi { protected final int DEFAULT_TUPLE_NUM = 10000; public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>(); - + private int tupleNum = DEFAULT_TUPLE_NUM; private int batchNum = 5; private TupleGenerator<T> tupleGenerator = null; @@ -42,17 +42,17 @@ public class POJOTupleGenerateOperator<T> implements InputOperator, ActivationLi public POJOTupleGenerateOperator() { } - + public POJOTupleGenerateOperator( Class<T> tupleClass ) { this.tupleClass = tupleClass; } - + public void setTupleType( Class<T> tupleClass ) { this.tupleClass = tupleClass; } - + @Override public void beginWindow(long windowId) { @@ -96,39 +96,39 @@ public class POJOTupleGenerateOperator<T> implements InputOperator, ActivationLi catch( Exception e ){} return; } - - + + for( int i=0; i<batchNum; ++i ) { int count = emitedTuples.get(); if( count >= theTupleNum ) return; - + if( emitedTuples.compareAndSet(count, count+1) ) { - T tuple = getNextTuple(); + T tuple = getNextTuple(); outputPort.emit ( tuple ); tupleEmitted( tuple ); - + if( count+1 == theTupleNum ) { tupleEmitDone(); return; } } - + } } - - + + protected void tupleEmitted( T tuple ){} protected void tupleEmitDone(){} - + public int getEmitedTupleCount() { return emitedTuples.get(); } - + public int getTupleNum() { return tupleNum; @@ -137,7 +137,7 @@ public class POJOTupleGenerateOperator<T> implements InputOperator, ActivationLi { this.tupleNum = tupleNum; } - + protected T getNextTuple() { if( tupleGenerator == null ) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java b/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java index 462c0b3..99910be 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java +++ b/contrib/src/test/java/com/datatorrent/contrib/util/TestPOJO.java @@ -40,15 +40,15 @@ public class TestPOJO implements Serializable fieldsInfo.add( new FieldInfo( "name", "name", SupportType.STRING ) ); fieldsInfo.add( new FieldInfo( "age", "age", SupportType.INTEGER ) ); fieldsInfo.add( new FieldInfo( "address", "address", SupportType.STRING ) ); - + return fieldsInfo; } - + public static String getRowExpression() { return "row"; } - + public static TestPOJO from( Map<String,byte[]> map ) { TestPOJO testPOJO = new TestPOJO(); @@ -58,14 +58,14 @@ public class TestPOJO implements Serializable } return testPOJO; } - + private Long rowId = null; private String name; private int age; private String address; public TestPOJO(){} - + public TestPOJO(long rowId) { this(rowId, "name" + rowId, (int) rowId, "address" + rowId); @@ -78,7 +78,7 @@ public class TestPOJO implements Serializable setAge(age); setAddress(address); } - + public void setValue( String fieldName, byte[] value ) { if( "row".equalsIgnoreCase(fieldName) ) @@ -148,7 +148,7 @@ public class TestPOJO implements Serializable { this.address = address; } - + @Override public boolean equals( Object obj ) { @@ -156,7 +156,7 @@ public class TestPOJO implements Serializable return false; if( !( obj instanceof TestPOJO ) ) return false; - + return completeEquals( (TestPOJO)obj ); } @@ -172,7 +172,7 @@ public class TestPOJO implements Serializable return false; return true; } - + public boolean completeEquals( TestPOJO other ) { if( other == null ) @@ -183,7 +183,7 @@ public class TestPOJO implements Serializable return false; return true; } - + public <T> boolean fieldEquals( T v1, T v2 ) { if( v1 == null && v2 == null ) @@ -192,7 +192,7 @@ public class TestPOJO implements Serializable return false; return v1.equals( v2 ); } - + @Override public String toString() { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java b/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java index 93dc189..4bd8c79 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java +++ b/contrib/src/test/java/com/datatorrent/contrib/util/TupleCacheOutputOperator.java @@ -33,10 +33,10 @@ public class TupleCacheOutputOperator<T> extends BaseOperator { private static final long serialVersionUID = 3090932382383138500L; private static final Logger logger = LoggerFactory.getLogger( TupleCacheOutputOperator.class ); - - //one instance of TupleCacheOutputOperator map to one + + //one instance of TupleCacheOutputOperator map to one private static Map< String, List<?> > receivedTuplesMap = new HashMap< String, List<?>>(); - + public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>() { @Override @@ -45,14 +45,14 @@ public class TupleCacheOutputOperator<T> extends BaseOperator processTuple( tuple ); } }; - + private String uuid; - + public TupleCacheOutputOperator() { uuid = java.util.UUID.randomUUID().toString(); } - + public String getUuid() { return uuid; @@ -74,7 +74,7 @@ public class TupleCacheOutputOperator<T> extends BaseOperator { return (List<T>)receivedTuplesMap.get(uuid); } - + public static List<Object> getReceivedTuples( String uuid ) { return (List<Object>)receivedTuplesMap.get(uuid); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java b/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java index bfce6f5..8ee38dd 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java +++ b/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerateCacheOperator.java @@ -25,16 +25,16 @@ import java.util.Map; public class TupleGenerateCacheOperator<T> extends POJOTupleGenerateOperator<T> { - //one instance of TupleCacheOutputOperator map to one + //one instance of TupleCacheOutputOperator map to one private static Map< String, List<?> > emittedTuplesMap = new HashMap< String, List<?>>(); private String uuid; - + public TupleGenerateCacheOperator() { uuid = java.util.UUID.randomUUID().toString(); } - + @SuppressWarnings("unchecked") protected void tupleEmitted( T tuple ) { @@ -46,7 +46,7 @@ public class TupleGenerateCacheOperator<T> extends POJOTupleGenerateOperator<T> } emittedTuples.add(tuple); } - + @SuppressWarnings("unchecked") public List<T> getTuples() { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java index cea81d5..844d31d 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java +++ b/contrib/src/test/java/com/datatorrent/contrib/util/TupleGenerator.java @@ -25,26 +25,26 @@ import org.slf4j.LoggerFactory; /** * This is a copy from contrib, should be merged later. - * + * */ public class TupleGenerator<T> { private static final Logger logger = LoggerFactory.getLogger( TupleGenerator.class ); - + private volatile long rowId = 0; private Constructor<T> constructor; - + private static Class<?>[] paramTypes = new Class<?>[]{ Long.class, long.class, Integer.class, int.class }; - + public TupleGenerator() { } - + public TupleGenerator( Class<T> tupleClass ) { useTupleClass( tupleClass ); } - + public void useTupleClass( Class<T> tupleClass ) { for( Class<?> paramType : paramTypes ) @@ -59,7 +59,7 @@ public class TupleGenerator<T> throw new RuntimeException( "Not found proper constructor." ); } } - + protected Constructor<T> tryGetConstructor( Class<T> tupleClass, Class<?> parameterType ) { try @@ -71,17 +71,17 @@ public class TupleGenerator<T> return null; } } - + public void reset() { rowId = 0; } - + public T getNextTuple() { if( constructor == null ) throw new RuntimeException( "Not found proper constructor." ); - + long curRowId = ++rowId; try { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java index 3538891..1bb599a 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQInputOperatorTest.java @@ -96,7 +96,7 @@ public class ZeroMQInputOperatorTest Thread.sleep(10); } else { break; - } + } } } catch (InterruptedException ex) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java index 27cd278..6155ec3 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java +++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageGenerator.java @@ -51,7 +51,7 @@ class ZeroMQMessageGenerator { public void send(Object message) { - String msg = message.toString(); + String msg = message.toString(); publisher.send(msg.getBytes(), 0); } @@ -72,8 +72,8 @@ class ZeroMQMessageGenerator { ArrayList<HashMap<String, Integer>> dataMaps = MessageQueueTestHelper.getMessages(); for(int j =0; j < dataMaps.size(); j++) { - send(dataMaps.get(j)); - } + send(dataMaps.get(j)); + } } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java index f472828..bb04817 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java +++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQMessageReceiver.java @@ -56,9 +56,9 @@ final class ZeroMQMessageReceiver implements Runnable @Override public void run() { - logger.debug("receiver running"); + logger.debug("receiver running"); while (!Thread.currentThread().isInterrupted() && !shutDown) { - //logger.debug("receiver running in loop"); + //logger.debug("receiver running in loop"); byte[] msg = subscriber.recv(ZMQ.NOBLOCK); // convert to HashMap and save the values for each key // then expect c to be 1000, b=20, a=2 @@ -68,7 +68,7 @@ final class ZeroMQMessageReceiver implements Runnable continue; } String str = new String(msg); - + if (str.indexOf("{") == -1) { continue; } @@ -85,7 +85,7 @@ final class ZeroMQMessageReceiver implements Runnable public void teardown() { shutDown=true; - + syncclient.close(); subscriber.close(); context.term(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java index b8332c0..41c5248 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/zmq/ZeroMQOutputOperatorTest.java @@ -47,7 +47,7 @@ public class ZeroMQOutputOperatorTest final int testNum = 3; runTest(testNum); - + logger.debug("end of test"); } @@ -60,7 +60,7 @@ public class ZeroMQOutputOperatorTest collector.setUrl("tcp://*:5556"); collector.setSyncUrl("tcp://*:5557"); collector.setSUBSCRIBERS_EXPECTED(1); - + dag.addStream("Stream", source.outPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); final LocalMode.Controller lc = lma.getController(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java index 762d322..ce5ba33 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java @@ -41,11 +41,11 @@ public class FullOuterJoinOperatorTest CollectorTestSink sink = new CollectorTestSink(); oper.outport.setSink(sink); - // set column join condition + // set column join condition Condition cond = new JoinColumnEqualCondition("a", "a"); oper.setJoinCondition(cond); - - // add columns + + // add columns oper.selectTable1Column(new ColumnIndex("b", null)); oper.selectTable2Column(new ColumnIndex("c", null)); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java index 6ad818e..0d2a2f5 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/GroupByOperatorTest.java @@ -85,7 +85,7 @@ public class GroupByOperatorTest tuple.put("b", 2); tuple.put("c", 7); oper.inport.process(tuple); - + oper.endWindow(); oper.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java index 5b696f1..3c685ab 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/HavingOperatorTest.java @@ -87,7 +87,7 @@ public class HavingOperatorTest tuple.put("b", 2); tuple.put("c", 7); oper.inport.process(tuple); - + oper.endWindow(); oper.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java index 8b4f923..18312d1 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/InnerJoinOperatorTest.java @@ -30,7 +30,7 @@ import com.datatorrent.lib.streamquery.index.ColumnIndex; import com.datatorrent.lib.testbench.CollectorTestSink; /** - * + * * Functional test for {@link com.datatorrent.lib.streamquery.InnerJoinOperator }. * @deprecated */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java index f78ba21..eb1ec6d 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/LeftOuterJoinOperatorTest.java @@ -41,11 +41,11 @@ public class LeftOuterJoinOperatorTest CollectorTestSink sink = new CollectorTestSink(); oper.outport.setSink(sink); - // set column join condition + // set column join condition Condition cond = new JoinColumnEqualCondition("a", "a"); oper.setJoinCondition(cond); - - // add columns + + // add columns oper.selectTable1Column(new ColumnIndex("b", null)); oper.selectTable2Column(new ColumnIndex("c", null)); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java index 8142276..70bc031 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/RightOuterJoinOperatorTest.java @@ -42,11 +42,11 @@ public class RightOuterJoinOperatorTest CollectorTestSink sink = new CollectorTestSink(); oper.outport.setSink(sink); - // set column join condition + // set column join condition Condition cond = new JoinColumnEqualCondition("a", "a"); oper.setJoinCondition(cond); - - // add columns + + // add columns oper.selectTable1Column(new ColumnIndex("b", null)); oper.selectTable2Column(new ColumnIndex("c", null)); @@ -83,7 +83,7 @@ public class RightOuterJoinOperatorTest tuple.put("b", 11); tuple.put("c", 12); oper.inport2.process(tuple); - + oper.endWindow(); oper.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java index 90480cf..4b609c1 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/SelectTopOperatorTest.java @@ -37,20 +37,20 @@ public class SelectTopOperatorTest oper.setTopValue(2); CollectorTestSink sink = new CollectorTestSink(); oper.outport.setSink(sink); - + oper.beginWindow(1); HashMap<String, Object> tuple = new HashMap<String, Object>(); tuple.put("a", 0); tuple.put("b", 1); tuple.put("c", 2); oper.inport.process(tuple); - + tuple = new HashMap<String, Object>(); tuple.put("a", 1); tuple.put("b", 3); tuple.put("c", 4); oper.inport.process(tuple); - + tuple = new HashMap<String, Object>(); tuple.put("a", 1); tuple.put("b", 5); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java index 01465db..568aed9 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/BetweenConditionTest.java @@ -79,7 +79,7 @@ public class BetweenConditionTest tuple.put("b", 7); tuple.put("c", 8); oper.inport.process(tuple); - + oper.endWindow(); oper.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java index e160e5d..929d134 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/CompoundConditionTest.java @@ -84,7 +84,7 @@ public class CompoundConditionTest tuple.put("b", 7); tuple.put("c", 8); oper.inport.process(tuple); - + oper.endWindow(); oper.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java ---------------------------------------------------------------------- diff --git a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java index d641a1c..255389b 100644 --- a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java +++ b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/advanced/InConditionTest.java @@ -81,7 +81,7 @@ public class InConditionTest tuple.put("b", 7); tuple.put("c", 8); oper.inport.process(tuple); - + oper.endWindow(); oper.teardown(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java index b9132d8..3c74cc5 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/AverageData.java @@ -45,7 +45,7 @@ public class AverageData /** * This constructor takes the value of sum and count and initialize the local attributes to corresponding values - * + * * @param count * the value of count */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java ---------------------------------------------------------------------- diff --git a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java index a5dda7e..6f02a24 100644 --- a/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java +++ b/demos/machinedata/src/main/java/com/datatorrent/demos/machinedata/data/MachineInfo.java @@ -42,7 +42,7 @@ public class MachineInfo /** * This constructor takes MachineKey as input and initialize local attributes - * + * * @param machineKey * the MachineKey instance */ @@ -53,7 +53,7 @@ public class MachineInfo /** * This constructor takes MachineKey, cpu usage, ram usage, hdd usage as input and initialize local attributes - * + * * @param machineKey * the MachineKey instance * @param cpu @@ -73,7 +73,7 @@ public class MachineInfo /** * This method returns the MachineKey - * + * * @return */ public MachineKey getMachineKey() @@ -83,7 +83,7 @@ public class MachineInfo /** * This method sets the MachineKey - * + * * @param machineKey * the MachineKey instance */ @@ -94,7 +94,7 @@ public class MachineInfo /** * This method returns the CPU% usage - * + * * @return */ public int getCpu() @@ -104,7 +104,7 @@ public class MachineInfo /** * This method sets the CPU% usage - * + * * @param cpu * the CPU% usage */ @@ -115,7 +115,7 @@ public class MachineInfo /** * This method returns the RAM% usage - * + * * @return */ public int getRam() @@ -125,7 +125,7 @@ public class MachineInfo /** * This method sets the RAM% usage - * + * * @param ram * the RAM% usage */ @@ -136,7 +136,7 @@ public class MachineInfo /** * This method returns the HDD% usage - * + * * @return */ public int getHdd() @@ -146,7 +146,7 @@ public class MachineInfo /** * This method sets the HDD% usage - * + * * @param hdd * the HDD% usage */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java ---------------------------------------------------------------------- diff --git a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java index 8964d84..f6708ba 100644 --- a/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java +++ b/demos/mobile/src/main/java/com/datatorrent/demos/mobile/PhoneEntryOperator.java @@ -59,8 +59,8 @@ public class PhoneEntryOperator extends BaseOperator /** * Sets the initial number of phones to display on the google map. - * - * @param i the count of initial phone numbers to display + * + * @param i the count of initial phone numbers to display */ public void setInitialDisplayCount(int i) { @@ -69,8 +69,8 @@ public class PhoneEntryOperator extends BaseOperator /** * Sets the range for the phone numbers generated by the operator. - * - * @param i the range within which the phone numbers are randomly generated. + * + * @param i the range within which the phone numbers are randomly generated. */ public void setPhoneRange(Range<Integer> phoneRange) { @@ -80,7 +80,7 @@ public class PhoneEntryOperator extends BaseOperator /** * Sets the max seed for random phone number generation - * + * * @param i the number to initialize the random number phone generator. */ public void setMaxSeedPhoneNumber(int number)
