Fix trailing whitespace.

Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/763d14fc
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/763d14fc
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/763d14fc

Branch: refs/heads/master
Commit: 763d14fca6b84fdda1b6853235e5d4b71ca87fca
Parents: 90b5c9b
Author: CI Support <[email protected]>
Authored: Mon Sep 26 20:36:22 2016 -0700
Committer: CI Support <[email protected]>
Committed: Mon Sep 26 20:36:22 2016 -0700

----------------------------------------------------------------------
 .../LogstreamWidgetOutputOperator.java          |  2 +-
 .../benchmark/CouchBaseAppOutput.java           |  2 +-
 ...nchmarkPartitionableKafkaOutputOperator.java |  2 +-
 .../state/ManagedStateBenchmarkApp.java         |  2 +-
 .../benchmark/state/StoreOperator.java          | 28 ++++----
 .../state/ManagedStateBenchmarkAppTester.java   | 14 ++--
 .../contrib/avro/AvroFileInputOperator.java     |  6 +-
 .../datatorrent/contrib/avro/AvroToPojo.java    |  6 +-
 .../datatorrent/contrib/avro/PojoToAvro.java    | 10 +--
 .../AbstractElasticSearchOutputOperator.java    |  2 +-
 .../elasticsearch/ElasticSearchConnectable.java |  6 +-
 .../ElasticSearchMapInputOperator.java          |  6 +-
 .../ElasticSearchMapOutputOperator.java         | 10 +--
 .../ElasticSearchPercolatorStore.java           | 12 ++--
 .../contrib/enrich/DelimitedFSLoader.java       |  4 +-
 .../datatorrent/contrib/enrich/FSLoader.java    |  2 +-
 .../contrib/enrich/FixedWidthFSLoader.java      | 10 +--
 .../contrib/formatter/CsvFormatter.java         |  6 +-
 .../geode/AbstractGeodeInputOperator.java       |  4 +-
 .../geode/AbstractGeodeOutputOperator.java      |  4 +-
 .../contrib/geode/GeodeCheckpointStore.java     | 18 +++---
 .../geode/GeodeKeyValueStorageAgent.java        |  2 +-
 .../contrib/geode/GeodePOJOOutputOperator.java  |  2 +-
 .../datatorrent/contrib/geode/GeodeStore.java   |  6 +-
 .../contrib/geode/RegionCreateFunction.java     |  2 +-
 .../contrib/hbase/HBaseFieldInfo.java           | 34 +++++-----
 .../kafka/AbstractKafkaInputOperator.java       |  6 +-
 .../contrib/kafka/HighlevelKafkaConsumer.java   |  8 +--
 .../contrib/kafka/KafkaPartition.java           | 16 ++---
 .../contrib/kafka/OffsetManager.java            |  4 +-
 .../contrib/kafka/SimpleKafkaConsumer.java      | 10 +--
 .../kinesis/AbstractKinesisOutputOperator.java  | 12 ++--
 .../contrib/memcache/MemcacheStore.java         |  2 +-
 .../contrib/mqtt/MqttClientConfig.java          |  2 +-
 .../parquet/AbstractParquetFileReader.java      |  6 +-
 .../contrib/parquet/ParquetFilePOJOReader.java  | 10 +--
 .../contrib/parser/CellProcessorBuilder.java    | 34 +++++-----
 .../datatorrent/contrib/parser/CsvParser.java   | 12 ++--
 .../contrib/parser/DelimitedSchema.java         | 26 ++++----
 .../datatorrent/contrib/parser/JsonParser.java  | 16 ++---
 .../contrib/r/REngineConnectable.java           |  8 +--
 .../java/com/datatorrent/contrib/r/RScript.java |  8 +--
 .../rabbitmq/AbstractRabbitMQInputOperator.java | 30 ++++-----
 .../AbstractRabbitMQOutputOperator.java         | 14 ++--
 .../rabbitmq/RabbitMQOutputOperator.java        |  4 +-
 .../redis/AbstractRedisInputOperator.java       |  4 +-
 .../redis/RedisKeyValueInputOperator.java       |  2 +-
 .../redis/RedisMapAsValueInputOperator.java     |  4 +-
 .../datatorrent/contrib/redis/RedisStore.java   |  6 +-
 .../solr/AbstractSolrOutputOperator.java        |  2 +-
 .../ConcurrentUpdateSolrServerConnector.java    |  2 +-
 .../datatorrent/contrib/splunk/SplunkStore.java |  2 +-
 .../contrib/zmq/ZeroMQInputOperator.java        |  2 +-
 .../apex/malhar/contrib/misc/math/Change.java   |  4 +-
 .../contrib/misc/math/CompareExceptMap.java     |  4 +-
 .../malhar/contrib/misc/math/ExceptMap.java     |  2 +-
 .../apex/malhar/contrib/misc/math/Quotient.java |  2 +-
 .../malhar/contrib/misc/math/QuotientMap.java   |  2 +-
 .../malhar/contrib/misc/math/SumCountMap.java   |  6 +-
 .../contrib/parser/StreamingJsonParser.java     |  8 +--
 .../contrib/couchbase/CouchBaseSetTest.java     |  2 +-
 .../ElasticSearchOperatorTest.java              |  6 +-
 .../ElasticSearchPercolateTest.java             | 10 +--
 .../contrib/geode/GeodeCheckpointStoreTest.java |  2 +-
 .../hbase/HBasePOJOInputOperatorTest.java       | 34 +++++-----
 .../contrib/hbase/HBasePOJOPutOperatorTest.java | 36 +++++------
 .../HBaseTransactionalPutOperatorTest.java      | 12 ++--
 .../datatorrent/contrib/hbase/HBaseUtil.java    |  8 +--
 .../contrib/helper/MessageQueueTestHelper.java  |  2 +-
 .../KafkaExactlyOnceOutputOperatorTest.java     | 20 +++---
 .../contrib/kafka/KafkaTestPartitioner.java     |  2 +-
 .../contrib/kafka/KafkaTestProducer.java        |  4 +-
 .../kinesis/KinesisOperatorTestBase.java        | 12 ++--
 .../kinesis/KinesisOutputOperatorTest.java      | 16 ++---
 .../KinesisStringOutputOperatorTest.java        |  4 +-
 .../contrib/kinesis/KinesisTestConsumer.java    | 26 ++++----
 .../memcache/MemcachePOJOOperatorTest.java      | 20 +++---
 .../memsql/AbstractMemsqlInputOperatorTest.java |  2 +-
 .../RabbitMQOutputOperatorBenchmark.java        |  2 +-
 .../contrib/redis/RedisInputOperatorTest.java   |  2 +-
 .../contrib/splunk/SplunkInputOperatorTest.java |  2 +-
 .../splunk/SplunkTcpOutputOperatorTest.java     |  2 +-
 .../util/FieldValueSerializableGenerator.java   | 20 +++---
 .../contrib/util/POJOTupleGenerateOperator.java | 30 ++++-----
 .../com/datatorrent/contrib/util/TestPOJO.java  | 22 +++----
 .../contrib/util/TupleCacheOutputOperator.java  | 14 ++--
 .../util/TupleGenerateCacheOperator.java        |  8 +--
 .../contrib/util/TupleGenerator.java            | 20 +++---
 .../contrib/zmq/ZeroMQInputOperatorTest.java    |  2 +-
 .../contrib/zmq/ZeroMQMessageGenerator.java     |  6 +-
 .../contrib/zmq/ZeroMQMessageReceiver.java      |  8 +--
 .../contrib/zmq/ZeroMQOutputOperatorTest.java   |  4 +-
 .../streamquery/FullOuterJoinOperatorTest.java  |  6 +-
 .../misc/streamquery/GroupByOperatorTest.java   |  2 +-
 .../misc/streamquery/HavingOperatorTest.java    |  2 +-
 .../misc/streamquery/InnerJoinOperatorTest.java |  2 +-
 .../streamquery/LeftOuterJoinOperatorTest.java  |  6 +-
 .../streamquery/RightOuterJoinOperatorTest.java |  8 +--
 .../misc/streamquery/SelectTopOperatorTest.java |  6 +-
 .../advanced/BetweenConditionTest.java          |  2 +-
 .../advanced/CompoundConditionTest.java         |  2 +-
 .../streamquery/advanced/InConditionTest.java   |  2 +-
 .../demos/machinedata/data/AverageData.java     |  2 +-
 .../demos/machinedata/data/MachineInfo.java     | 20 +++---
 .../demos/mobile/PhoneEntryOperator.java        | 10 +--
 .../datatorrent/demos/pi/CalculatorTest.java    |  2 +-
 .../hive/AbstractFSRollingOutputOperator.java   |  2 +-
 .../datatorrent/contrib/hive/HiveOperator.java  | 14 ++--
 .../apex/malhar/hive/HiveOutputModule.java      | 52 +++++++--------
 .../malhar/kafka/AbstractKafkaPartitioner.java  | 12 ++--
 .../apex/malhar/kafka/KafkaConsumerWrapper.java |  4 +-
 .../malhar/kafka/KafkaInputOperatorTest.java    | 44 ++++++-------
 .../apex/malhar/kafka/KafkaTestPartitioner.java |  2 +-
 .../apex/malhar/kafka/KafkaTestProducer.java    |  8 +--
 .../lib/appdata/gpo/SerdeMapPrimitive.java      |  4 +-
 .../lib/appdata/query/WindowBoundedService.java |  2 +-
 .../schemas/DimensionalConfigurationSchema.java |  2 +-
 .../lib/appdata/schemas/DimensionalSchema.java  | 14 ++--
 .../datatorrent/lib/appdata/schemas/Schema.java |  2 +-
 .../lib/appdata/schemas/SnapshotSchema.java     |  2 +-
 .../snapshot/AbstractAppDataSnapshotServer.java | 18 +++---
 .../lib/bandwidth/BandwidthPartitioner.java     |  4 +-
 .../com/datatorrent/lib/codec/package-info.java |  2 +-
 .../datatorrent/lib/converter/Converter.java    |  4 +-
 .../com/datatorrent/lib/db/KeyValueStore.java   |  2 +-
 .../db/jdbc/AbstractJdbcPOJOOutputOperator.java |  2 +-
 .../db/jdbc/AbstractJdbcPollInputOperator.java  |  6 +-
 .../lib/db/jdbc/JdbcPollInputOperator.java      |  2 +-
 .../datatorrent/lib/fileaccess/TFileImpl.java   | 28 ++++----
 .../datatorrent/lib/fileaccess/TFileWriter.java |  6 +-
 .../datatorrent/lib/filter/FilterOperator.java  |  4 +-
 .../datatorrent/lib/formatter/Formatter.java    |  6 +-
 .../lib/formatter/JsonFormatter.java            |  2 +-
 .../datatorrent/lib/formatter/XmlFormatter.java | 12 ++--
 .../lib/io/AbstractFTPInputOperator.java        |  2 +-
 .../datatorrent/lib/io/block/BlockWriter.java   |  8 +--
 .../datatorrent/lib/io/block/FSSliceReader.java |  2 +-
 .../lib/io/fs/AbstractFileSplitter.java         |  2 +-
 .../io/fs/AbstractSingleFileOutputOperator.java | 12 ++--
 .../com/datatorrent/lib/io/fs/FileMerger.java   |  2 +-
 .../com/datatorrent/lib/io/fs/FileStitcher.java | 26 ++++----
 .../lib/io/fs/FilterStreamCodec.java            |  4 +-
 .../lib/io/fs/FilterStreamContext.java          |  2 +-
 .../lib/io/fs/FilterStreamProvider.java         | 14 ++--
 .../lib/io/fs/HDFSFileCopyModule.java           | 12 ++--
 .../datatorrent/lib/io/fs/HDFSFileMerger.java   |  6 +-
 .../com/datatorrent/lib/io/fs/Synchronizer.java |  2 +-
 .../AbstractJMSSinglePortOutputOperator.java    |  2 +-
 .../com/datatorrent/lib/io/jms/JMSBase.java     |  8 +--
 .../lib/math/AbstractAggregateCalc.java         |  2 +-
 .../AbstractXmlKeyValueCartesianProduct.java    |  4 +-
 .../java/com/datatorrent/lib/math/Division.java | 38 +++++------
 .../java/com/datatorrent/lib/math/Margin.java   |  2 +-
 .../com/datatorrent/lib/math/MarginMap.java     |  6 +-
 .../main/java/com/datatorrent/lib/math/Min.java |  4 +-
 .../lib/math/MultiplyByConstant.java            | 10 +--
 .../datatorrent/lib/math/RunningAverage.java    |  2 +-
 .../java/com/datatorrent/lib/math/Sigma.java    |  2 +-
 .../math/SingleVariableAbstractCalculus.java    |  2 +-
 .../main/java/com/datatorrent/lib/math/Sum.java |  2 +-
 .../math/XmlKeyValueStringCartesianProduct.java |  4 +-
 .../com/datatorrent/lib/math/package-info.java  |  4 +-
 .../java/com/datatorrent/lib/parser/Parser.java |  6 +-
 .../com/datatorrent/lib/parser/XmlParser.java   |  2 +-
 .../lib/projection/ProjectionOperator.java      |  2 +-
 .../datatorrent/lib/script/ScriptOperator.java  |  8 +--
 .../lib/util/AbstractKeyValueStorageAgent.java  | 20 +++---
 .../lib/util/StorageAgentKeyValueStore.java     | 10 +--
 .../com/datatorrent/lib/util/TableInfo.java     |  4 +-
 .../java/com/datatorrent/lib/util/TopNSort.java |  2 +-
 .../com/datatorrent/lib/util/package-info.java  |  2 +-
 .../malhar/lib/dedup/BoundedDedupOperator.java  | 14 ++--
 .../aggregator/AbstractCompositeAggregator.java | 14 ++--
 .../AbstractCompositeAggregatorFactory.java     |  2 +-
 .../AbstractIncrementalAggregator.java          |  2 +-
 .../aggregator/AbstractTopBottomAggregator.java | 41 ++++++------
 .../aggregator/AggregatorRegistry.java          | 24 +++----
 .../dimensions/aggregator/AggregatorUtils.java  |  2 +-
 .../aggregator/CompositeAggregator.java         | 10 +--
 .../aggregator/CompositeAggregatorFactory.java  |  4 +-
 .../DefaultCompositeAggregatorFactory.java      |  8 +--
 .../aggregator/TopBottomAggregatorFactory.java  | 14 ++--
 .../apex/malhar/lib/fs/FSRecordReader.java      | 10 +--
 .../malhar/lib/fs/FSRecordReaderModule.java     | 22 +++----
 .../managed/IncrementalCheckpointManager.java   |  2 +-
 .../state/spillable/TimeBasedPriorityQueue.java |  6 +-
 .../malhar/lib/wal/FSWindowDataManager.java     | 12 ++--
 .../apex/malhar/lib/wal/FSWindowReplayWAL.java  |  4 +-
 .../apex/malhar/lib/wal/FileSystemWAL.java      |  2 +-
 .../malhar/lib/window/accumulation/Average.java |  8 +--
 .../malhar/lib/window/accumulation/Group.java   |  8 +--
 .../malhar/lib/window/accumulation/Max.java     | 14 ++--
 .../malhar/lib/window/accumulation/Min.java     | 14 ++--
 .../window/accumulation/RemoveDuplicates.java   |  8 +--
 .../lib/window/accumulation/SumDouble.java      |  8 +--
 .../lib/window/accumulation/SumFloat.java       |  8 +--
 .../malhar/lib/window/accumulation/SumInt.java  |  8 +--
 .../malhar/lib/window/accumulation/SumLong.java |  8 +--
 .../apache/hadoop/io/file/tfile/DTBCFile.java   | 68 ++++++++++----------
 .../tfile/ReusableByteArrayInputStream.java     |  8 +--
 .../lib/algo/BottomNUnifierTest.java            |  6 +-
 .../MapToKeyValuePairConverterTest.java         | 14 ++--
 .../StringValueToNumberConverterForMapTest.java | 14 ++--
 .../lib/db/cache/CacheStoreTest.java            |  4 +-
 .../datatorrent/lib/db/jdbc/JdbcIOAppTest.java  |  2 +-
 .../lib/db/jdbc/JdbcOperatorTest.java           |  6 +-
 .../com/datatorrent/lib/filter/FilterTest.java  |  8 +--
 .../lib/formatter/XmlFormatterTest.java         |  2 +-
 .../io/fs/AbstractFileInputOperatorTest.java    |  2 +-
 .../io/fs/AbstractFileOutputOperatorTest.java   | 24 +++----
 .../AbstractSingleFileOutputOperatorTest.java   |  2 +-
 .../lib/io/fs/FastMergerDecisionMakerTest.java  | 12 ++--
 .../lib/io/fs/FileSplitterInputTest.java        | 24 +++----
 .../datatorrent/lib/io/fs/SynchronizerTest.java |  2 +-
 .../lib/io/fs/TailFsInputOperatorTest.java      |  4 +-
 .../logs/FilteredLineToTokenHashMapTest.java    |  2 +-
 .../lib/logs/LineToTokenArrayListTest.java      |  4 +-
 .../lib/logs/LineToTokenHashMapTest.java        |  2 +-
 .../lib/logs/LineTokenizerKeyValTest.java       |  2 +-
 .../MultiWindowDimensionAggregationTest.java    |  2 +-
 .../com/datatorrent/lib/math/MarginMapTest.java |  4 +-
 .../com/datatorrent/lib/math/SigmaTest.java     |  2 +-
 .../lib/multiwindow/SortedMovingWindowTest.java | 32 ++++-----
 .../datatorrent/lib/parser/XmlParserTest.java   |  2 +-
 .../lib/statistics/MeridianOperatorTest.java    |  2 +-
 .../lib/statistics/ModeOperatorTest.java        |  2 +-
 .../StandardDeviationOperatorTest.java          |  4 +-
 .../statistics/WeightedMeanOperatorTest.java    |  2 +-
 .../lib/stream/DevNullCounterTest.java          |  4 +-
 .../com/datatorrent/lib/stream/DevNullTest.java |  2 +-
 .../lib/testbench/ActiveMQMessageGenerator.java |  2 +-
 .../lib/testbench/RandomEventGeneratorTest.java |  2 +-
 .../com/datatorrent/lib/util/TestUtils.java     |  2 +-
 .../lib/dedup/DeduperPartitioningTest.java      |  4 +-
 .../apex/malhar/lib/fs/FSRecordReaderTest.java  |  4 +-
 .../lib/fs/GenericFileOutputOperatorTest.java   |  6 +-
 .../SpillableByteArrayListMultimapImplTest.java |  6 +-
 .../SpillableComplexComponentImplTest.java      |  2 +-
 .../spillable/SpillableSetMultimapImplTest.java |  6 +-
 .../malhar/lib/wal/FSWindowDataManagerTest.java | 14 ++--
 .../lib/window/accumulation/AverageTest.java    |  2 +-
 .../lib/window/accumulation/FoldFnTest.java     | 26 ++++----
 .../lib/window/accumulation/GroupTest.java      |  2 +-
 .../malhar/lib/window/accumulation/MaxTest.java |  6 +-
 .../malhar/lib/window/accumulation/MinTest.java |  6 +-
 .../lib/window/accumulation/ReduceFnTest.java   |  6 +-
 .../accumulation/RemoveDuplicatesTest.java      |  2 +-
 .../malhar/lib/window/accumulation/SumTest.java |  8 +--
 .../lib/window/accumulation/TopNByKeyTest.java  | 26 ++++----
 .../apache/hadoop/io/file/tfile/TestDTFile.java |  4 +-
 .../io/file/tfile/TestDTFileByteArrays.java     |  4 +-
 .../io/file/tfile/TestTFileComparator2.java     |  8 +--
 .../io/file/tfile/TestTFileComparators.java     |  4 +-
 .../TestTFileJClassComparatorByteArrays.java    |  6 +-
 .../file/tfile/TestTFileLzoCodecsStreams.java   |  2 +-
 ...ileNoneCodecsJClassComparatorByteArrays.java |  4 +-
 .../hadoop/io/file/tfile/TestTFileSeek.java     | 22 +++----
 .../file/tfile/TestTFileSeqFileComparison.java  |  2 +-
 .../hadoop/io/file/tfile/TestTFileSplit.java    | 12 ++--
 .../hadoop/io/file/tfile/TestTFileStreams.java  |  6 +-
 .../FunctionOperator/FunctionOperatorTest.java  | 12 ++--
 261 files changed, 1092 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java
 
b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java
index 1b94532..29c92e6 100644
--- 
a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java
+++ 
b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java
@@ -128,7 +128,7 @@ public class LogstreamWidgetOutputOperator extends 
WidgetOutputOperator
     {
       @SuppressWarnings("unchecked")
       HashMap<String, Object>[] result = (HashMap<String, 
Object>[])Array.newInstance(HashMap.class, topNMap.size());
-      
+
       int j = 0;
       for (Entry<String, Number> e : topNMap.entrySet()) {
         result[j] = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java 
b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
index 1a24984..f789d08 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java
@@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation;
  *
  * Application to benchmark the performance of couchbase output operator.
  * The number of tuples processed per second were around 20,000.
- * 
+ *
  * @since 2.0.0
  */
 @ApplicationAnnotation(name = "CouchBaseAppOutput")

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
index c60e99a..1126ac1 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
@@ -55,7 +55,7 @@ public class BenchmarkPartitionableKafkaOutputOperator 
implements Partitioner<Be
 {
 
   private static final Logger logger = 
LoggerFactory.getLogger(BenchmarkPartitionableKafkaOutputOperator.class);
-  
+
   private String topic = "benchmark";
 
   @Min(1)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
 
b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
index 7d9c3ba..eab02db 100644
--- 
a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
+++ 
b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java
@@ -53,7 +53,7 @@ public class ManagedStateBenchmarkApp implements 
StreamingApplication
 
   protected StoreOperator storeOperator;
   protected int timeRange = 1000 * 60; // one minute range of hot keys
-  
+
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java 
b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
index 3298543..2748c29 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java
@@ -48,7 +48,7 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
     UPDATESYNC,
     UPDATEASYNC
   }
-  
+
   protected static final int numOfWindowPerStatistics = 10;
 
   //this is the store we are going to use
@@ -60,10 +60,10 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
   protected long tupleCount = 0;
   protected int windowCountPerStatistics = 0;
   protected long statisticsBeginTime = 0;
-  
+
   protected ExecMode execMode = ExecMode.INSERT;
   protected int timeRange = 1000 * 60;
-  
+
   public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = 
new DefaultInputPort<KeyValPair<byte[], byte[]>>()
   {
     @Override
@@ -102,9 +102,9 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
 
   protected transient Queue<Future<Slice>> taskQueue = new 
LinkedList<Future<Slice>>();
   protected transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> 
taskToPair = Maps.newHashMap();
-  
+
   /**
-   * we verify 3 type of operation 
+   * we verify 3 type of operation
    * @param tuple
    */
   protected void processTuple(KeyValPair<byte[], byte[]> tuple)
@@ -119,21 +119,21 @@ public class StoreOperator extends BaseOperator 
implements Operator.CheckpointNo
         store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey()));
         insertValueToStore(tuple);
         break;
-        
+
       default: //insert
         insertValueToStore(tuple);
     }
   }
-  
+
   protected long getTimeByKey(byte[] key)
   {
     long lKey = ByteBuffer.wrap(key).getLong();
     return lKey - (lKey % timeRange);
   }
-  
+
   // give a barrier to avoid used up memory
   protected final int taskBarrier = 100000;
-  
+
   /**
    * This method first send request of get to the state manager, then handle 
all the task(get) which already done and update the value.
    * @param tuple
@@ -143,14 +143,14 @@ public class StoreOperator extends BaseOperator 
implements Operator.CheckpointNo
     if (taskQueue.size() > taskBarrier) {
       //slow down to avoid too much task waiting.
       try {
-        
+
         logger.info("Queue Size: {}, wait time(milli-seconds): {}", 
taskQueue.size(), taskQueue.size() / taskBarrier);
         Thread.sleep(taskQueue.size() / taskBarrier);
       } catch (Exception e) {
         //ignore
       }
     }
-    
+
     //send request of get to the state manager and add to the taskQueue and 
taskToPair.
     //the reason of an extra taskQueue to make sure the tasks are ordered
     {
@@ -171,7 +171,7 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
       insertValueToStore(taskToPair.remove(task));
     }
   }
-  
+
   protected void insertValueToStore(KeyValPair<byte[], byte[]> tuple)
   {
     Slice key = new Slice(tuple.getKey());
@@ -232,7 +232,7 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
   {
     return execMode.name();
   }
-  
+
   public void setExecModeStr(String execModeStr)
   {
     //this method used for set configuration. so, use case-insensitive
@@ -252,5 +252,5 @@ public class StoreOperator extends BaseOperator implements 
Operator.CheckpointNo
   {
     this.timeRange = timeRange;
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git 
a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
 
b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
index 93a7720..4435aad 100644
--- 
a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
+++ 
b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
@@ -39,31 +39,31 @@ import 
com.datatorrent.benchmark.state.StoreOperator.ExecMode;
 public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
 {
   public static final String basePath = "target/temp";
-  
+
   @Before
   public void before()
   {
     FileUtil.fullyDelete(new File(basePath));
   }
-  
+
   @Test
   public void testUpdateSync() throws Exception
   {
     test(ExecMode.UPDATESYNC);
   }
-  
+
   @Test
   public void testUpdateAsync() throws Exception
   {
     test(ExecMode.UPDATEASYNC);
   }
-  
+
   @Test
   public void testInsert() throws Exception
   {
     test(ExecMode.INSERT);
   }
-  
+
   public void test(ExecMode exeMode) throws Exception
   {
     Configuration conf = new Configuration(false);
@@ -73,7 +73,7 @@ public class ManagedStateBenchmarkAppTester extends 
ManagedStateBenchmarkApp
 
     super.populateDAG(dag, conf);
     storeOperator.execMode = exeMode;
-    
+
     StreamingApplication app = new StreamingApplication()
     {
       @Override
@@ -92,7 +92,7 @@ public class ManagedStateBenchmarkAppTester extends 
ManagedStateBenchmarkApp
   }
 
 
-  
+
   @Override
   public String getStoreBasePath(Configuration conf)
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java 
b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
index b03e31a..01e99d3 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java
@@ -50,7 +50,7 @@ import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
  * input file<br>
  * Users can add the {@link FSWindowDataManager}
  * to ensure exactly once semantics with a HDFS backed WAL.
- * 
+ *
  * @displayName AvroFileInputOperator
  * @category Input
  * @tags fs, file,avro, input operator
@@ -81,7 +81,7 @@ public class AvroFileInputOperator extends 
AbstractFileInputOperator<GenericReco
 
   /**
    * Returns a input stream given a file path
-   * 
+   *
    * @param path
    * @return InputStream
    * @throws IOException
@@ -101,7 +101,7 @@ public class AvroFileInputOperator extends 
AbstractFileInputOperator<GenericReco
   /**
    * Reads a GenericRecord from the given input stream<br>
    * Emits the FileName,Offset,Exception on the error port if its connected
-   * 
+   *
    * @return GenericRecord
    */
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java 
b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
index ad54491..1951c1e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java
@@ -140,7 +140,7 @@ public class AvroToPojo extends BaseOperator
 
   /**
    * Returns a POJO from a Generic Record
-   * 
+   *
    * @return Object
    */
   @SuppressWarnings("unchecked")
@@ -220,7 +220,7 @@ public class AvroToPojo extends BaseOperator
   /**
    * Use reflection to generate field info values if the user has not provided
    * the inputs mapping
-   * 
+   *
    * @return String representing the POJO field to Avro field mapping
    */
   private String generateFieldInfoInputs(Class<?> cls)
@@ -240,7 +240,7 @@ public class AvroToPojo extends BaseOperator
   /**
    * Creates a map representing fieldName in POJO:field in Generic Record:Data
    * type
-   * 
+   *
    * @return List of FieldInfo
    */
   private List<FieldInfo> createFieldInfoMap(String str)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java 
b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
index 5fd7ee2..2f8fb19 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java
@@ -97,7 +97,7 @@ public class PojoToAvro extends BaseOperator
 
   /**
    * Returns the schema string for Avro Generic Record
-   * 
+   *
    * @return schemaString
    */
   public String getSchemaString()
@@ -115,7 +115,7 @@ public class PojoToAvro extends BaseOperator
 
   /**
    * Returns the schema object
-   * 
+   *
    * @return schema
    */
   private Schema getSchema()
@@ -133,7 +133,7 @@ public class PojoToAvro extends BaseOperator
 
   /**
    * Returns the list for field names from provided Avro schema
-   * 
+   *
    * @return List of Fields
    */
   private List<Field> getColumnNames()
@@ -151,7 +151,7 @@ public class PojoToAvro extends BaseOperator
 
   /**
    * This method generates the getters for provided field of a given class
-   * 
+   *
    * @return Getter
    */
   private Getter<?, ?> generateGettersForField(Class<?> cls, String 
inputFieldName)
@@ -232,7 +232,7 @@ public class PojoToAvro extends BaseOperator
 
   /**
    * Returns a generic record mapping the POJO fields to provided schema
-   * 
+   *
    * @return Generic Record
    */
   private GenericRecord getGenericRecord(Object tuple) throws Exception

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java
index 7753108..0282ae8 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java
@@ -57,7 +57,7 @@ import com.datatorrent.lib.db.AbstractStoreOutputOperator;
  * @displayName Elastic Search Output
  * @category Output
  * @tags elastic search
- * 
+ *
  * @since 2.1.0
  */
 public abstract class AbstractElasticSearchOutputOperator<T, S extends 
ElasticSearchConnectable> extends AbstractStoreOutputOperator<T, S>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java
 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java
index fdf4e62..34eca95 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java
@@ -90,7 +90,7 @@ public class ElasticSearchConnectable implements Connectable
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see com.datatorrent.lib.db.Connectable#connect()
    */
   @Override
@@ -102,7 +102,7 @@ public class ElasticSearchConnectable implements Connectable
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see com.datatorrent.lib.db.Connectable#disconnect()
    */
   @Override
@@ -115,7 +115,7 @@ public class ElasticSearchConnectable implements Connectable
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see com.datatorrent.lib.db.Connectable#isConnected()
    */
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java
index 024b098..dcbee9d 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java
@@ -40,7 +40,7 @@ public abstract class ElasticSearchMapInputOperator<T extends 
Map<String, Object
   protected String type;
 
   /**
-   * 
+   *
    */
   public ElasticSearchMapInputOperator()
   {
@@ -49,7 +49,7 @@ public abstract class ElasticSearchMapInputOperator<T extends 
Map<String, Object
 
   /**
    * {@link SearchRequestBuilder} properties which do not change for each 
window are set during operator initialization.
-   * 
+   *
    * @see 
com.datatorrent.contrib.elasticsearch.AbstractElasticSearchInputOperator#setup(com.datatorrent.api.Context.OperatorContext)
    */
   @Override
@@ -61,7 +61,7 @@ public abstract class ElasticSearchMapInputOperator<T extends 
Map<String, Object
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see
    * 
com.datatorrent.contrib.elasticsearch.AbstractElasticSearchInputOperator#convertToTuple(org.elasticsearch.search
    * .SearchHit)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java
index 51a4688..8616938 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java
@@ -38,7 +38,7 @@ public class ElasticSearchMapOutputOperator<T extends 
Map<String, Object>> exten
   private String type;
 
   /**
-   * 
+   *
    */
   public ElasticSearchMapOutputOperator()
   {
@@ -48,7 +48,7 @@ public class ElasticSearchMapOutputOperator<T extends 
Map<String, Object>> exten
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see
    * 
com.datatorrent.contrib.elasticsearch.AbstractElasticSearchOutputOperator#setSource(org.elasticsearch.action.index
    * .IndexRequestBuilder, java.lang.Object)
@@ -61,7 +61,7 @@ public class ElasticSearchMapOutputOperator<T extends 
Map<String, Object>> exten
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see 
com.datatorrent.contrib.elasticsearch.AbstractElasticSearchOutputOperator#getId(java.lang.Object)
    */
   @Override
@@ -103,7 +103,7 @@ public class ElasticSearchMapOutputOperator<T extends 
Map<String, Object>> exten
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see 
com.datatorrent.contrib.elasticsearch.AbstractElasticSearchOutputOperator#getIndexName(java.lang.Object)
    */
   @Override
@@ -128,5 +128,5 @@ public class ElasticSearchMapOutputOperator<T extends 
Map<String, Object>> exten
   protected String getType(T tuple)
   {
     return type;
-  }  
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java
 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java
index 7d88336..c13c025 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java
@@ -28,7 +28,7 @@ import org.elasticsearch.index.query.QueryBuilder;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
- * 
+ *
  * @since 2.1.0
  */
 public class ElasticSearchPercolatorStore extends ElasticSearchConnectable
@@ -44,7 +44,7 @@ public class ElasticSearchPercolatorStore extends 
ElasticSearchConnectable
   public void registerPercolateQuery(String indexName, String queryName, 
QueryBuilder queryBuilder)
   {
     try {
-      
+
       client.prepareIndex(indexName, PERCOLATOR_TYPE, queryName)
         .setSource(XContentFactory.jsonBuilder()
             .startObject()
@@ -52,22 +52,22 @@ public class ElasticSearchPercolatorStore extends 
ElasticSearchConnectable
             .endObject())
         .setRefresh(true)
         .execute().actionGet();
-      
+
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }
   }
-  
+
   public PercolateResponse percolate(String[] indexNames, String documentType, 
Object tuple){
     XContentBuilder docBuilder;
     try {
-      
+
       docBuilder = XContentFactory.jsonBuilder().startObject();
       docBuilder.field("doc").startObject(); //This is needed to designate the 
document
       docBuilder.field("content", tuple);
       docBuilder.endObject(); //End of the doc field
       docBuilder.endObject();//End of the JSON root object
-      
+
       return client.preparePercolate().setIndices(indexNames)
           .setDocumentType(documentType)
           .setSource(docBuilder)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java 
b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
index 9fa7129..3121cf1 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java
@@ -146,7 +146,7 @@ public class DelimitedFSLoader extends FSLoader
 
   /**
    * Get the schema
-   * 
+   *
    * @return
    */
   public String getSchema()
@@ -156,7 +156,7 @@ public class DelimitedFSLoader extends FSLoader
 
   /**
    * Set the schema
-   * 
+   *
    * @param schema
    */
   public void setSchema(String schema)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java 
b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
index 997243d..464fa99 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java
@@ -117,7 +117,7 @@ public abstract class FSLoader extends ReadOnlyBackup
    * a record. Concrete implementations override this method to parse a record
    * and convert it to Map of field names and values OR simply returns null to
    * skip the records.
-   * 
+   *
    * @param line
    *          A single record from file
    * @return a map with field name and value. Null value if returned is ignored

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java 
b/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java
index 8b7eac0..d37ce3e 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java
@@ -94,7 +94,7 @@ public class FixedWidthFSLoader extends FSLoader
 
   /**
    * Set to true if file has header
-   * 
+   *
    * @param hasHeader
    *          Indicates whether first line of the file is a header. Default is
    *          false
@@ -106,7 +106,7 @@ public class FixedWidthFSLoader extends FSLoader
 
   /**
    * Gets the field description
-   * 
+   *
    * @return fieldDescription. String specifying information related to fields
    *         in fixed-width file.
    */
@@ -117,7 +117,7 @@ public class FixedWidthFSLoader extends FSLoader
 
   /**
    * Sets fieldDescription
-   * 
+   *
    * @param fieldDescription
    *          a String specifying information related to fields in fixed-width
    *          file. Format is [NAME]:[FIELD_TYPE]:[WIDTH]:[date format if
@@ -135,7 +135,7 @@ public class FixedWidthFSLoader extends FSLoader
   /**
    * Gets the character used for padding in the fixed-width file.Default is
    * white space (' ')
-   * 
+   *
    * @return Padding character. Default is white space.
    */
   public char getPadding()
@@ -146,7 +146,7 @@ public class FixedWidthFSLoader extends FSLoader
   /**
    * Sets the character used for padding in fixed-width file.Default is white
    * space (' ')
-   * 
+   *
    * @param padding
    *          Padding character. Default is white space.
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java 
b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
index 34ba49c..2979b44 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java
@@ -58,7 +58,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  * <b>in</b>:input tuple as a POJO. Each tuple represents a record<br>
  * <b>out</b>:tuples are are converted to string are emitted on this port<br>
  * <b>err</b>:tuples that could not be converted are emitted on this port<br>
- * 
+ *
  * @displayName CsvFormatter
  * @category Formatter
  * @tags pojo csv formatter
@@ -180,7 +180,7 @@ public class CsvFormatter extends Formatter<String>
 
   /**
    * Get the schema
-   * 
+   *
    * @return schema
    */
   public String getSchema()
@@ -190,7 +190,7 @@ public class CsvFormatter extends Formatter<String>
 
   /**
    * Set the schema
-   * 
+   *
    * @param schema
    */
   public void setSchema(String schema)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java
index bc8c1f0..497e6e4 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java
@@ -25,14 +25,14 @@ import 
com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator;
  * concrete operator should be created from this skeleton implementation.
  * <p>
  * </p>
- * 
+ *
  * @displayName Abstract Geode Input
  * @category Input
  * @tags geode, key value
  *
  * @param <T>
  *          The tuple type.
- * 
+ *
  *
  * @since 3.4.0
  */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java
index 7b0e158..dd0bad2 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java
@@ -25,14 +25,14 @@ import com.datatorrent.lib.db.AbstractStoreOutputOperator;
  * operator should be created from this skeleton implementation.
  * <p>
  * </p>
- * 
+ *
  * @displayName Abstract Geode Output
  * @category Output
  * @tags geode, key value
  *
  * @param <T>
  *          The tuple type.
- * 
+ *
  *
  * @since 3.4.0
  */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java
index edd07d9..2152b97 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java
@@ -47,7 +47,7 @@ import java.util.Map.Entry;
 /**
  * Geode Store implementation of {@link StorageAgentKeyValueStore} Uses {@link 
Kryo}
  * serialization to store retrieve objects
- * 
+ *
  *
  *
  * @since 3.4.0
@@ -56,7 +56,7 @@ public class GeodeCheckpointStore
     implements StorageAgentKeyValueStore, Serializable
 {
 
-  public static final String GET_KEYS_QUERY = 
+  public static final String GET_KEYS_QUERY =
       "SELECT entry.key FROM /$[region}.entries entry WHERE entry.key LIKE 
'${operator.id}%'";
 
   private String geodeLocators;
@@ -82,7 +82,7 @@ public class GeodeCheckpointStore
 
   /**
    * Initializes Geode store by using locator connection string
-   * 
+   *
    * @param locatorString
    */
   public GeodeCheckpointStore(String locatorString)
@@ -101,7 +101,7 @@ public class GeodeCheckpointStore
 
   /**
    * Get the Geode locator connection string
-   * 
+   *
    * @return locator connection string
    */
   public String getGeodeLocators()
@@ -111,7 +111,7 @@ public class GeodeCheckpointStore
 
   /**
    * Sets the Geode locator string
-   * 
+   *
    * @param geodeLocators
    */
   public void setGeodeLocators(String geodeLocators)
@@ -160,7 +160,7 @@ public class GeodeCheckpointStore
 
   /**
    * Creates a region
-   * 
+   *
    */
   public synchronized void createRegion()
   {
@@ -185,7 +185,7 @@ public class GeodeCheckpointStore
 
   /**
    * Check if store is connected to configured Geode cluster or not
-   * 
+   *
    * @return True is connected to Geode cluster and client cache is active
    */
   @Override
@@ -199,7 +199,7 @@ public class GeodeCheckpointStore
 
   /**
    * Return the value for specified key from Geode region
-   * 
+   *
    * @return the value object
    */
   @Override
@@ -252,7 +252,7 @@ public class GeodeCheckpointStore
 
   /**
    * Get list for keys starting from provided key name
-   * 
+   *
    * @return List of keys
    */
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java
 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java
index fdfd4ce..691c2c1 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java
@@ -27,7 +27,7 @@ import com.datatorrent.lib.util.AbstractKeyValueStorageAgent;
 /**
  * Storage Agent implementation which uses {@link GeodeCheckpointStore} for 
operator
  * checkpointing
- * 
+ *
  *
  *
  * @since 3.4.0

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java
index defaa54..c7d22c7 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java
@@ -30,7 +30,7 @@ import com.datatorrent.lib.util.TableInfo;
  * @displayName Geode Output Operator
  * @category Output
  * @tags pojo, geode
- * 
+ *
  *
  * @since 3.4.0
  */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java 
b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java
index bdb7add..d345661 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java
@@ -51,14 +51,14 @@ import com.datatorrent.lib.db.KeyValueStore;
  *  that provides reliable asynchronous event notifications and guaranteed 
message delivery.
  * Geode is a data management platform that provides real-time
  * , consistent access to data-intensive applications.
- * 
+ *
  *
  * @since 3.4.0
  */
 public class GeodeStore implements KeyValueStore, Serializable
 {
   /**
-   * 
+   *
    */
   private static final long serialVersionUID = -5076452548893319967L;
   private static final Logger logger = 
LoggerFactory.getLogger(GeodeStore.class);
@@ -198,7 +198,7 @@ public class GeodeStore implements KeyValueStore, 
Serializable
     try {
       return (getRegion().get(key));
     } catch (IOException ex) {
-      throw new RuntimeException("Exception while getting the object", ex);    
  
+      throw new RuntimeException("Exception while getting the object", ex);
 
     }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java 
b/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java
index bc808ad..9e948c4 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java
@@ -33,7 +33,7 @@ import com.gemstone.gemfire.cache.execute.FunctionContext;
 
 /**
  * Function to create region dynamically through client API
- * 
+ *
  *
  * @since 3.4.0
  */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java 
b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java
index a43b893..6a34a91 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java
@@ -29,11 +29,11 @@ import com.datatorrent.lib.util.FieldInfo;
 public class HBaseFieldInfo extends FieldInfo
 {
        private String familyName;
-       
+
        public HBaseFieldInfo()
        {
        }
-       
+
        public HBaseFieldInfo( String columnName, String columnExpression, 
SupportType type, String familyName )
        {
          super( columnName, columnExpression, type );
@@ -49,7 +49,7 @@ public class HBaseFieldInfo extends FieldInfo
        {
                this.familyName = familyName;
        }
-       
+
        public byte[] toBytes( Object value )
        {
                final SupportType type = getType();
@@ -57,28 +57,28 @@ public class HBaseFieldInfo extends FieldInfo
                {
                case BOOLEAN:
                  return Bytes.toBytes( (Boolean)value );
-                 
+
                case SHORT:
                  return Bytes.toBytes( (Short)value );
-                 
+
                case INTEGER:
                  return Bytes.toBytes( (Integer)value );
-                 
+
                case LONG:
                  return Bytes.toBytes( (Long)value );
-                 
+
                case FLOAT:
                  return Bytes.toBytes( (Float)value );
-                 
+
                case DOUBLE:
                  return Bytes.toBytes( (Double)value );
-                 
+
                case STRING:
                  return Bytes.toBytes( (String)value );
                }
                throw new IllegalArgumentException( "Unsupported type: " + type 
);
        }
-       
+
        public Object toValue( byte[] bytes )
        {
     final SupportType type = getType();
@@ -86,26 +86,26 @@ public class HBaseFieldInfo extends FieldInfo
     {
     case BOOLEAN:
       return Bytes.toBoolean( bytes );
-      
+
     case SHORT:
       return Bytes.toShort( bytes );
-      
+
     case INTEGER:
       return Bytes.toInt( bytes );
-      
+
     case LONG:
       return Bytes.toLong( bytes );
-      
+
     case FLOAT:
       return Bytes.toFloat( bytes );
-      
+
     case DOUBLE:
       return Bytes.toDouble( bytes );
-      
+
     case STRING:
       return Bytes.toString( bytes );
     }
     throw new IllegalArgumentException( "Unsupported type: " + type );
   }
-       
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index fc11bf7..1218f4a 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -202,7 +202,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   protected abstract void emitTuple(Message message);
 
   /**
-   * Concrete class derived from KafkaInputOpertor should implement this 
method if it wants to access kafka offset and partitionId along with kafka 
message. 
+   * Concrete class derived from KafkaInputOpertor should implement this 
method if it wants to access kafka offset and partitionId along with kafka 
message.
    */
   protected void emitTuple(KafkaConsumer.KafkaMessage message)
   {
@@ -524,7 +524,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
     Set<Integer> deletedOperators = Sets.newHashSet();
     Collection<Partition<AbstractKafkaInputOperator<K>>> resultPartitions = 
partitions;
     boolean numPartitionsChanged = false;
-    
+
     switch (strategy) {
 
     // For the 1 to 1 mapping The framework will create number of operator 
partitions based on kafka topic partitions
@@ -617,7 +617,7 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
     default:
       break;
     }
-  
+
     if (numPartitionsChanged) {
       List<WindowDataManager> managers = 
windowDataManager.partition(resultPartitions.size(), deletedOperators);
       int i = 0;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
index 5b9c5ed..85cee56 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
@@ -123,11 +123,11 @@ public class HighlevelKafkaConsumer extends KafkaConsumer
       Properties config = new Properties();
       config.putAll(consumerConfig);
       config.setProperty("zookeeper.connect", 
zookeeperMap.get(cluster).iterator().next());
-      // create consumer connector will start a daemon thread to monitor the 
metadata change 
-      // we want to start this thread until the operator is activated 
+      // create consumer connector will start a daemon thread to monitor the 
metadata change
+      // we want to start this thread until the operator is activated
       standardConsumer.put(cluster, 
kafka.consumer.Consumer.createJavaConsumerConnector(new 
ConsumerConfig(config)));
     }
-    
+
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
 
     if (numStream == null || numStream.size() == 0) {
@@ -232,5 +232,5 @@ public class HighlevelKafkaConsumer extends KafkaConsumer
     // offset is not useful for high-level kafka consumer
     throw new UnsupportedOperationException("Offset request is currently not 
supported for high-level consumer");
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java
index a86a205..9954eb3 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java
@@ -26,7 +26,7 @@ import java.io.Serializable;
 public class KafkaPartition implements Serializable
 {
   protected static final String DEFAULT_CLUSTERID = 
"com.datatorrent.contrib.kafka.defaultcluster";
-  
+
   @SuppressWarnings("unused")
   private KafkaPartition()
   {
@@ -46,15 +46,15 @@ public class KafkaPartition implements Serializable
   }
 
   /**
-   * 
+   *
    */
   private static final long serialVersionUID = 7556802229202221546L;
-  
+
 
   private String clusterId;
-  
+
   private int partitionId;
-  
+
   private String topic;
 
   public String getClusterId()
@@ -128,7 +128,7 @@ public class KafkaPartition implements Serializable
   {
     return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + 
partitionId + ", topic=" + topic + "]";
   }
-  
-  
-  
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
index 5eb0575..0dee11e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
@@ -33,11 +33,11 @@ public interface OffsetManager
 //
 
   /**
-   * 
+   *
    * Load initial offsets for all kafka partition
    * <br>
    * The method is called at the first attempt of creating partitions and the 
return value is used as initial offset for simple consumer
-   * 
+   *
    * @return Map of Kafka KafkaPartition as key and long offset as value
    */
   public Map<KafkaPartition, Long> loadInitialOffsets();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index fb89389..4db1d69 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -69,7 +69,7 @@ import kafka.message.MessageAndOffset;
  * <br>
  *
  * Load balance: <br>
- * <li>The consumer create several data-consuming threads to consume the data 
from broker(s)</li> 
+ * <li>The consumer create several data-consuming threads to consume the data 
from broker(s)</li>
  * <li>Each thread has only ONE kafka client connecting to ONE broker to 
consume data from for multiple partitions </li>
  * <li>
  * There is ONE separate thread to monitor the leadership for all the 
partitions of the topic at every
@@ -89,7 +89,7 @@ public class SimpleKafkaConsumer extends KafkaConsumer
 {
 
   /**
-   * The data-consuming thread that use one simple kafka client to connect to 
one broker which is the leader of the partition(s) that this consumer is 
interested 
+   * The data-consuming thread that use one simple kafka client to connect to 
one broker which is the leader of the partition(s) that this consumer is 
interested
    */
   static final class ConsumerThread implements Runnable
   {
@@ -161,7 +161,7 @@ public class SimpleKafkaConsumer extends KafkaConsumer
               KafkaPartition kafkaPartition = iterator.next();
               short errorCode = fetchResponse.errorCode(consumer.topic, 
kafkaPartition.getPartitionId());
               if (fetchResponse.hasError() && errorCode != 
ErrorMapping.NoError()) {
-                // Kick off partition(s) which has error when fetch from this 
broker temporarily 
+                // Kick off partition(s) which has error when fetch from this 
broker temporarily
                 // Monitor will find out which broker it goes in monitor thread
                 logger.warn("Error when consuming topic {} from broker {} with 
error {} ", kafkaPartition, broker,
                   ErrorMapping.exceptionFor(errorCode));
@@ -177,7 +177,7 @@ public class SimpleKafkaConsumer extends KafkaConsumer
                 consumer.partitionToBroker.remove(kafkaPartition);
                 consumer.stats.updatePartitionStats(kafkaPartition, -1, "");
                 continue;
-              } 
+              }
               // If the fetchResponse either has no error or the no error for 
$kafkaPartition get the data
               long offset = -1l;
               for (MessageAndOffset msg : 
fetchResponse.messageSet(consumer.topic, kafkaPartition.getPartitionId())) {
@@ -200,7 +200,7 @@ public class SimpleKafkaConsumer extends KafkaConsumer
           // Update consumer that these partitions are currently stop being 
consumed because of some unrecoverable exception
           consumer.partitionToBroker.remove(kpForConsumer);
         }
-        
+
         logger.info("Exit the consumer thread for broker {} ", broker);
       }
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
index 7be453a..43fc62a 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java
@@ -69,14 +69,14 @@ public abstract class AbstractKinesisOutputOperator<V, T> 
implements Operator
    * @return
    */
   protected abstract byte[] getRecord(V value);
-  
+
   /**
    * convert tuple to pair of key and value. the key will be used as 
PartitionKey, and the value used as Data
    * @param tuple
    * @return
    */
   protected abstract Pair<String, V> tupleToKeyValue(T tuple);
-  
+
   List<PutRecordsRequestEntry> putRecordsRequestEntryList = new 
ArrayList<PutRecordsRequestEntry>();
   // Max size of each record: 50KB, Max size of putRecords: 4.5MB
   // So, default capacity would be 4.5MB/50KB = 92
@@ -145,7 +145,7 @@ public abstract class AbstractKinesisOutputOperator<V, T> 
implements Operator
     {
       processTuple( tuple );
     }
-    
+
   };
 
   public void processTuple(T tuple)
@@ -169,15 +169,15 @@ public abstract class AbstractKinesisOutputOperator<V, T> 
implements Operator
         requestRecord.setData(ByteBuffer.wrap(getRecord(keyValue.second)));
 
         client.putRecord(requestRecord);
-        
+
       }
       sendCount++;
     } catch (AmazonClientException e) {
       throw new RuntimeException(e);
     }
   }
-  
-  
+
+
   private void addRecord(T tuple)
   {
     try {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java 
b/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java
index 973f1ee..1465f03 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java
@@ -52,7 +52,7 @@ public class MemcacheStore implements KeyValueStore
   {
     serverAddresses.add(addr);
   }
-  
+
 
   public List<InetSocketAddress> getServerAddresses()
   {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java 
b/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java
index ee16786..e8f52df 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java
@@ -294,7 +294,7 @@ public class MqttClientConfig
 
   /**
    * Sets the port
-   * 
+   *
    * @param port the port
    */
   public void setPort(int port)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java
 
b/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java
index 1be2f0d..c4eefff 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java
@@ -92,7 +92,7 @@ public abstract class AbstractParquetFileReader<T> extends 
AbstractFileInputOper
    * Derived classes need to provide an implementation to convert a Parquet
    * Group to any other type. Each Parquet record is read as a <b>Group</b>
    * (parquet.example.data.Group) and is passed onto this method.
-   * 
+   *
    * @param group
    *          Parquet record represented as a Group
    * @return object of type T
@@ -101,7 +101,7 @@ public abstract class AbstractParquetFileReader<T> extends 
AbstractFileInputOper
 
   /**
    * Get Parquet Schema as a String
-   * 
+   *
    * @return parquetSchema Parquet Schema as a string.
    */
   public String getParquetSchema()
@@ -111,7 +111,7 @@ public abstract class AbstractParquetFileReader<T> extends 
AbstractFileInputOper
 
   /**
    * Set Parquet Schema as a String
-   * 
+   *
    * @param parquetSchema
    *          Parquet Schema as a string
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java
 
b/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java
index 8834c18..37bd60b 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java
@@ -98,7 +98,7 @@ public class ParquetFilePOJOReader extends 
AbstractParquetFileReader<Object>
    * Converts a Parquet <b>Group</b>(parquet.example.data.Group) to a POJO.
    * Supported parquet primitive types are BOOLEAN, INT32, INT64, FLOAT, DOUBLE
    * and BINARY
-   * 
+   *
    * @throws ParquetEncodingException
    *           if group contains unsupported type
    */
@@ -167,7 +167,7 @@ public class ParquetFilePOJOReader extends 
AbstractParquetFileReader<Object>
   /**
    * Initializes {@link #activeFieldInfos} by adding fields represented by
    * fieldMapping
-   * 
+   *
    * @param fieldMapping
    *          String representing Parquet field name TO POJO field field name
    *          mapping
@@ -213,7 +213,7 @@ public class ParquetFilePOJOReader extends 
AbstractParquetFileReader<Object>
 
   /**
    * Returns String containing Parquet field name to POJO field name mapping
-   * 
+   *
    * @return parquetToPOJOFieldsMapping String representing Parquet field name
    *         TO POJO field name mapping
    */
@@ -230,7 +230,7 @@ public class ParquetFilePOJOReader extends 
AbstractParquetFileReader<Object>
    * long_id_v2:
    * LONG,css_file_loaded:css_file_loaded_v2:BOOLEAN,float_val:float_val_v2:
    * FLOAT,double_val:double_val_v2:DOUBLE
-   * 
+   *
    * @param parquetToPOJOFieldsMapping
    *          String representing Parquet field name TO POJO field name mapping
    */
@@ -261,7 +261,7 @@ public class ParquetFilePOJOReader extends 
AbstractParquetFileReader<Object>
   /**
    * Use reflection to generate field info values if the user has not provided
    * the inputs mapping.
-   * 
+   *
    * @return String representing the Parquet field name to POJO field name
    *         mapping
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
 
b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
index bf8f85d..e7840aa 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java
@@ -50,7 +50,7 @@ import 
com.datatorrent.contrib.parser.DelimitedSchema.FieldType;
  * purpose and can be chained together with other processors to fully automate
  * all of the required conversions and constraint validation for a single
  * delimited record.
- * 
+ *
  *
  * @since 3.4.0
  */
@@ -59,7 +59,7 @@ public class CellProcessorBuilder
 
   /**
    * Method to get cell processors for given field type and constraints
-   * 
+   *
    * @param fieldType
    *          data type of the field
    * @param constraints
@@ -93,7 +93,7 @@ public class CellProcessorBuilder
    * Method to get cellprocessor for String with constraints. These constraints
    * are evaluated against the String field for which this cellprocessor is
    * defined.
-   * 
+   *
    * @param constraints
    *          map of constraints applicable to String
    * @return CellProcessor
@@ -135,7 +135,7 @@ public class CellProcessorBuilder
    * Method to get cellprocessor for Integer with constraints. These 
constraints
    * are evaluated against the Integer field for which this cellprocessor is
    * defined.
-   * 
+   *
    * @param constraints
    *          map of constraints applicable to Integer
    * @return CellProcessor
@@ -170,7 +170,7 @@ public class CellProcessorBuilder
    * Method to get cellprocessor for Long with constraints. These constraints
    * are evaluated against the Long field for which this cellprocessor is
    * defined.
-   * 
+   *
    * @param constraints
    *          map of constraints applicable to Long
    * @return CellProcessor
@@ -204,7 +204,7 @@ public class CellProcessorBuilder
    * Method to get cellprocessor for Float/Double with constraints. These
    * constraints are evaluated against the Float/Double field for which this
    * cellprocessor is defined.
-   * 
+   *
    * @param constraints
    *          map of constraints applicable to Float/Double
    * @return CellProcessor
@@ -238,7 +238,7 @@ public class CellProcessorBuilder
    * Method to get cellprocessor for Boolean with constraints. These 
constraints
    * are evaluated against the Boolean field for which this cellprocessor is
    * defined.
-   * 
+   *
    * @param constraints
    *          map of constraints applicable to Boolean
    * @return CellProcessor
@@ -267,7 +267,7 @@ public class CellProcessorBuilder
    * Method to get cellprocessor for Date with constraints. These constraints
    * are evaluated against the Date field for which this cellprocessor is
    * defined.
-   * 
+   *
    * @param constraints
    *          map of constraints applicable to Date
    * @return CellProcessor
@@ -291,7 +291,7 @@ public class CellProcessorBuilder
    * Method to get cellprocessor for Char with constraints. These constraints
    * are evaluated against the Char field for which this cellprocessor is
    * defined.
-   * 
+   *
    * @param constraints
    *          map of constraints applicable to Char
    * @return CellProcessor
@@ -316,7 +316,7 @@ public class CellProcessorBuilder
 
   /**
    * Get a Double Min Max cellprocessor.
-   * 
+   *
    * @param minValue
    *          minimum value.
    * @param maxValue
@@ -332,7 +332,7 @@ public class CellProcessorBuilder
 
   /**
    * Get a Long Min Max cellprocessor.
-   * 
+   *
    * @param minValue
    *          minimum value.
    * @param maxValue
@@ -348,7 +348,7 @@ public class CellProcessorBuilder
 
   /**
    * Get a Int Min Max cellprocessor.
-   * 
+   *
    * @param minValue
    *          minimum value.
    * @param maxValue
@@ -364,7 +364,7 @@ public class CellProcessorBuilder
 
   /**
    * Get Optional cellprocessor which means field is not mandatory.
-   * 
+   *
    * @param cellProcessor
    *          next processor in the chain.
    * @return CellProcessor
@@ -379,7 +379,7 @@ public class CellProcessorBuilder
 
   /**
    * Get cellprocessor to parse String as Integer.
-   * 
+   *
    * @param cellProcessor
    *          next processor in the chain.
    * @return CellProcessor
@@ -394,7 +394,7 @@ public class CellProcessorBuilder
 
   /**
    * Get cellprocessor to parse String as Long.
-   * 
+   *
    * @param cellProcessor
    *          next processor in the chain.
    * @return CellProcessor
@@ -409,7 +409,7 @@ public class CellProcessorBuilder
 
   /**
    * Get cellprocessor to parse String as Double.
-   * 
+   *
    * @param cellProcessor
    *          next processor in the chain.
    * @return CellProcessor
@@ -424,7 +424,7 @@ public class CellProcessorBuilder
 
   /**
    * Get cellprocessor to parse String as Character.
-   * 
+   *
    * @param cellProcessor
    *          next processor in the chain.
    * @return CellProcessor

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java 
b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
index 4698821..ea406e9 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java
@@ -65,7 +65,7 @@ import com.datatorrent.netlet.util.DTThrowable;
  * <b>err</b>:tuples that do not confine to schema are emitted on this port as
  * KeyValPair<String,String><br>
  * Key being the tuple and Val being the reason.
- * 
+ *
  * @displayName CsvParser
  * @category Parsers
  * @tags csv pojo parser
@@ -229,7 +229,7 @@ public class CsvParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Get the schema
-   * 
+   *
    * @return
    */
   public String getSchema()
@@ -239,7 +239,7 @@ public class CsvParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Set the schema
-   * 
+   *
    * @param schema
    */
   public void setSchema(String schema)
@@ -249,7 +249,7 @@ public class CsvParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Get errorTupleCount
-   * 
+   *
    * @return errorTupleCount
    */
   @VisibleForTesting
@@ -260,7 +260,7 @@ public class CsvParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Get emittedObjectCount
-   * 
+   *
    * @return emittedObjectCount
    */
   @VisibleForTesting
@@ -271,7 +271,7 @@ public class CsvParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Get incomingTuplesCount
-   * 
+   *
    * @return incomingTuplesCount
    */
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java 
b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
index eb86c15..29b2c92 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java
@@ -175,7 +175,7 @@ public class DelimitedSchema
 
   /**
    * For a given json string, this method sets the field members
-   * 
+   *
    * @param json
    * @throws JSONException
    * @throws IOException
@@ -208,7 +208,7 @@ public class DelimitedSchema
 
   /**
    * Get the list of field names mentioned in schema
-   * 
+   *
    * @return fieldNames
    */
   public List<String> getFieldNames()
@@ -218,7 +218,7 @@ public class DelimitedSchema
 
   /**
    * Get the delimiter character
-   * 
+   *
    * @return delimiterChar
    */
   public int getDelimiterChar()
@@ -228,7 +228,7 @@ public class DelimitedSchema
 
   /**
    * Get the quoteChar
-   * 
+   *
    * @return quoteChar
    */
   public char getQuoteChar()
@@ -238,7 +238,7 @@ public class DelimitedSchema
 
   /**
    * Get the line delimiter
-   * 
+   *
    * @return lineDelimiter
    */
   public String getLineDelimiter()
@@ -255,7 +255,7 @@ public class DelimitedSchema
 
   /**
    * Get the list of Fields.
-   * 
+   *
    * @return fields
    */
   public List<Field> getFields()
@@ -266,7 +266,7 @@ public class DelimitedSchema
   /**
    * Objects of this class represents a particular field in the schema. Each
    * field has a name, type and a set of associated constraints.
-   * 
+   *
    */
   public class Field
   {
@@ -291,7 +291,7 @@ public class DelimitedSchema
 
     /**
      * Get the name of the field
-     * 
+     *
      * @return name
      */
     public String getName()
@@ -301,7 +301,7 @@ public class DelimitedSchema
 
     /**
      * Set the name of the field
-     * 
+     *
      * @param name
      */
     public void setName(String name)
@@ -311,7 +311,7 @@ public class DelimitedSchema
 
     /**
      * Get {@link FieldType}
-     * 
+     *
      * @return type
      */
     public FieldType getType()
@@ -321,7 +321,7 @@ public class DelimitedSchema
 
     /**
      * Set {@link FieldType}
-     * 
+     *
      * @param type
      */
     public void setType(FieldType type)
@@ -331,7 +331,7 @@ public class DelimitedSchema
 
     /**
      * Get the map of constraints associated with the field
-     * 
+     *
      * @return constraints
      */
     public Map<String, Object> getConstraints()
@@ -341,7 +341,7 @@ public class DelimitedSchema
 
     /**
      * Sets the map of constraints associated with the field
-     * 
+     *
      * @param constraints
      */
     public void setConstraints(Map<String, Object> constraints)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java 
b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
index b6c3c4d..bb95f9c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java
@@ -66,8 +66,8 @@ import com.datatorrent.netlet.util.DTThrowable;
  * <b>err</b>:tuples that do not confine to schema are emitted on this port as
  * KeyValPair<String,String><br>
  * Key being the tuple and Val being the reason.
- * 
- * 
+ *
+ *
  * @displayName JsonParser
  * @category Parsers
  * @tags json pojo parser
@@ -180,7 +180,7 @@ public class JsonParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Get jsonSchema contents as a string to be used during validation
-   * 
+   *
    * @return jsonSchema
    */
   public String getJsonSchema()
@@ -190,7 +190,7 @@ public class JsonParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Sets jsonSchema to be used during validation
-   * 
+   *
    * @param jsonSchema
    *          schema as a string
    */
@@ -201,7 +201,7 @@ public class JsonParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Get errorTupleCount
-   * 
+   *
    * @return errorTupleCount
    */
   @VisibleForTesting
@@ -212,7 +212,7 @@ public class JsonParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Get emittedObjectCount
-   * 
+   *
    * @return emittedObjectCount
    */
   @VisibleForTesting
@@ -223,7 +223,7 @@ public class JsonParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Get incomingTuplesCount
-   * 
+   *
    * @return incomingTuplesCount
    */
   @VisibleForTesting
@@ -234,7 +234,7 @@ public class JsonParser extends Parser<byte[], 
KeyValPair<String, String>>
 
   /**
    * Set schema.
-   * 
+   *
    * @param schema
    */
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java 
b/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java
index a1a23b7..3de0055 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java
@@ -30,7 +30,7 @@ import com.datatorrent.lib.db.Connectable;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
- * @since 2.1.0 
+ * @since 2.1.0
  */
 public class REngineConnectable implements Connectable
 {
@@ -57,7 +57,7 @@ public class REngineConnectable implements Connectable
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see com.datatorrent.lib.db.Connectable#connect()
    */
   @Override
@@ -82,7 +82,7 @@ public class REngineConnectable implements Connectable
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see com.datatorrent.lib.db.Connectable#disconnect()
    */
   @Override
@@ -95,7 +95,7 @@ public class REngineConnectable implements Connectable
 
   /*
    * (non-Javadoc)
-   * 
+   *
    * @see com.datatorrent.lib.db.Connectable#isConnected()
    */
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java 
b/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java
index b562641..e171336 100755
--- a/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java
@@ -48,16 +48,16 @@ import com.datatorrent.netlet.util.DTThrowable;
  * 5. set the type of arguments being passed. This will be done in a Map <br>
  * 6. Send the data in the form of a tuple consisting of a key:value pair 
where, "key" represents the name of the
  *    argument "value" represents the actual value of the argument. A map of 
all the arguments is created and passed as
- *    input. <br> <br> 
- *    
+ *    input. <br> <br>
+ *
  *    The result will be returned on one of the output ports depending on the 
type of the return value.
  * <br> <br>
- * 
+ *
  * <b> Sample Usage Code : </b> oper is an object of type RScript. Create it 
by passing  <br> < name of the R script with
  * path from classpath>, < name of the function to be invoked>, < name of the 
return variable>);
  * <br> <br>
  * Map<String, RScript.REXP_TYPE> argTypeMap = new HashMap<String, 
RScript.REXP_TYPE>();  <br>
- * argTypeMap.put(< argument name>, RScript.< argument type in the form of 
REXP_TYPE>); <br> 
+ * argTypeMap.put(< argument name>, RScript.< argument type in the form of 
REXP_TYPE>); <br>
  * argTypeMap.put(< argument name>, RScript.< argument type in the form of 
REXP_TYPE>);  <br>
  * ...... <br>
  *

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
index 847602e..08157bc 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java
@@ -101,7 +101,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   protected transient Channel channel;
   protected transient TracingConsumer tracingConsumer;
   protected transient String cTag;
-  
+
   protected transient ArrayBlockingQueue<KeyValPair<Long,byte[]>> 
holdingBuffer;
   private WindowDataManager windowDataManager;
   protected final transient Map<Long, byte[]> currentWindowRecoveryState;
@@ -109,7 +109,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   private transient final Set<Long> recoveredTags;
   private transient long currentWindowId;
   private transient int operatorContextId;
-  
+
   public AbstractRabbitMQInputOperator()
   {
     currentWindowRecoveryState = new HashMap<Long, byte[]>();
@@ -118,7 +118,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
     windowDataManager = new WindowDataManager.NoopWindowDataManager();
   }
 
-  
+
 /**
  * define a consumer which can asynchronously receive data,
  * and added to holdingBuffer
@@ -162,7 +162,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
         }
         return;
       }
-      
+
       // Acknowledgements are sent at the end of the window after adding to 
idempotency manager
       pendingAck.add(tag);
       holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body));
@@ -196,7 +196,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   }
 
   @SuppressWarnings("unchecked")
-  private void replay(long windowId) {      
+  private void replay(long windowId) {
     Map<Long, byte[]> recoveredData;
     try {
       recoveredData = (Map<Long, 
byte[]>)this.windowDataManager.retrieve(windowId);
@@ -212,7 +212,7 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
     }
   }
 
-  
+
   @Override
   public void endWindow()
   {
@@ -221,25 +221,25 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
     KeyValPair<Long, byte[]> message;
     while ((message = holdingBuffer.poll()) != null) {
       currentWindowRecoveryState.put(message.getKey(), message.getValue());
-      emitTuple(message.getValue());      
+      emitTuple(message.getValue());
     }
-    
+
     try {
       this.windowDataManager.save(currentWindowRecoveryState, currentWindowId);
     } catch (IOException e) {
       DTThrowable.rethrow(e);
     }
-    
+
     currentWindowRecoveryState.clear();
-    
+
     for (Long deliveryTag : pendingAck) {
       try {
         channel.basicAck(deliveryTag, false);
-      } catch (IOException e) {        
+      } catch (IOException e) {
         DTThrowable.rethrow(e);
       }
     }
-    
+
     pendingAck.clear();
   }
 
@@ -391,15 +391,15 @@ public abstract class AbstractRabbitMQInputOperator<T> 
implements
   {
     this.routingKey = routingKey;
   }
-  
+
   public WindowDataManager getWindowDataManager() {
     return windowDataManager;
   }
-  
+
   public void setWindowDataManager(WindowDataManager windowDataManager) {
     this.windowDataManager = windowDataManager;
   }
-  
+
 
 
 }

Reply via email to