Fix trailing whitespace.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/763d14fc Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/763d14fc Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/763d14fc Branch: refs/heads/master Commit: 763d14fca6b84fdda1b6853235e5d4b71ca87fca Parents: 90b5c9b Author: CI Support <[email protected]> Authored: Mon Sep 26 20:36:22 2016 -0700 Committer: CI Support <[email protected]> Committed: Mon Sep 26 20:36:22 2016 -0700 ---------------------------------------------------------------------- .../LogstreamWidgetOutputOperator.java | 2 +- .../benchmark/CouchBaseAppOutput.java | 2 +- ...nchmarkPartitionableKafkaOutputOperator.java | 2 +- .../state/ManagedStateBenchmarkApp.java | 2 +- .../benchmark/state/StoreOperator.java | 28 ++++---- .../state/ManagedStateBenchmarkAppTester.java | 14 ++-- .../contrib/avro/AvroFileInputOperator.java | 6 +- .../datatorrent/contrib/avro/AvroToPojo.java | 6 +- .../datatorrent/contrib/avro/PojoToAvro.java | 10 +-- .../AbstractElasticSearchOutputOperator.java | 2 +- .../elasticsearch/ElasticSearchConnectable.java | 6 +- .../ElasticSearchMapInputOperator.java | 6 +- .../ElasticSearchMapOutputOperator.java | 10 +-- .../ElasticSearchPercolatorStore.java | 12 ++-- .../contrib/enrich/DelimitedFSLoader.java | 4 +- .../datatorrent/contrib/enrich/FSLoader.java | 2 +- .../contrib/enrich/FixedWidthFSLoader.java | 10 +-- .../contrib/formatter/CsvFormatter.java | 6 +- .../geode/AbstractGeodeInputOperator.java | 4 +- .../geode/AbstractGeodeOutputOperator.java | 4 +- .../contrib/geode/GeodeCheckpointStore.java | 18 +++--- .../geode/GeodeKeyValueStorageAgent.java | 2 +- .../contrib/geode/GeodePOJOOutputOperator.java | 2 +- .../datatorrent/contrib/geode/GeodeStore.java | 6 +- .../contrib/geode/RegionCreateFunction.java | 2 +- .../contrib/hbase/HBaseFieldInfo.java | 34 +++++----- .../kafka/AbstractKafkaInputOperator.java | 6 +- .../contrib/kafka/HighlevelKafkaConsumer.java | 8 +-- .../contrib/kafka/KafkaPartition.java | 16 ++--- .../contrib/kafka/OffsetManager.java | 4 +- .../contrib/kafka/SimpleKafkaConsumer.java | 10 +-- .../kinesis/AbstractKinesisOutputOperator.java | 12 ++-- .../contrib/memcache/MemcacheStore.java | 2 +- .../contrib/mqtt/MqttClientConfig.java | 2 +- .../parquet/AbstractParquetFileReader.java | 6 +- .../contrib/parquet/ParquetFilePOJOReader.java | 10 +-- .../contrib/parser/CellProcessorBuilder.java | 34 +++++----- .../datatorrent/contrib/parser/CsvParser.java | 12 ++-- .../contrib/parser/DelimitedSchema.java | 26 ++++---- .../datatorrent/contrib/parser/JsonParser.java | 16 ++--- .../contrib/r/REngineConnectable.java | 8 +-- .../java/com/datatorrent/contrib/r/RScript.java | 8 +-- .../rabbitmq/AbstractRabbitMQInputOperator.java | 30 ++++----- .../AbstractRabbitMQOutputOperator.java | 14 ++-- .../rabbitmq/RabbitMQOutputOperator.java | 4 +- .../redis/AbstractRedisInputOperator.java | 4 +- .../redis/RedisKeyValueInputOperator.java | 2 +- .../redis/RedisMapAsValueInputOperator.java | 4 +- .../datatorrent/contrib/redis/RedisStore.java | 6 +- .../solr/AbstractSolrOutputOperator.java | 2 +- .../ConcurrentUpdateSolrServerConnector.java | 2 +- .../datatorrent/contrib/splunk/SplunkStore.java | 2 +- .../contrib/zmq/ZeroMQInputOperator.java | 2 +- .../apex/malhar/contrib/misc/math/Change.java | 4 +- .../contrib/misc/math/CompareExceptMap.java | 4 +- .../malhar/contrib/misc/math/ExceptMap.java | 2 +- .../apex/malhar/contrib/misc/math/Quotient.java | 2 +- .../malhar/contrib/misc/math/QuotientMap.java | 2 +- .../malhar/contrib/misc/math/SumCountMap.java | 6 +- .../contrib/parser/StreamingJsonParser.java | 8 +-- .../contrib/couchbase/CouchBaseSetTest.java | 2 +- .../ElasticSearchOperatorTest.java | 6 +- .../ElasticSearchPercolateTest.java | 10 +-- .../contrib/geode/GeodeCheckpointStoreTest.java | 2 +- .../hbase/HBasePOJOInputOperatorTest.java | 34 +++++----- .../contrib/hbase/HBasePOJOPutOperatorTest.java | 36 +++++------ .../HBaseTransactionalPutOperatorTest.java | 12 ++-- .../datatorrent/contrib/hbase/HBaseUtil.java | 8 +-- .../contrib/helper/MessageQueueTestHelper.java | 2 +- .../KafkaExactlyOnceOutputOperatorTest.java | 20 +++--- .../contrib/kafka/KafkaTestPartitioner.java | 2 +- .../contrib/kafka/KafkaTestProducer.java | 4 +- .../kinesis/KinesisOperatorTestBase.java | 12 ++-- .../kinesis/KinesisOutputOperatorTest.java | 16 ++--- .../KinesisStringOutputOperatorTest.java | 4 +- .../contrib/kinesis/KinesisTestConsumer.java | 26 ++++---- .../memcache/MemcachePOJOOperatorTest.java | 20 +++--- .../memsql/AbstractMemsqlInputOperatorTest.java | 2 +- .../RabbitMQOutputOperatorBenchmark.java | 2 +- .../contrib/redis/RedisInputOperatorTest.java | 2 +- .../contrib/splunk/SplunkInputOperatorTest.java | 2 +- .../splunk/SplunkTcpOutputOperatorTest.java | 2 +- .../util/FieldValueSerializableGenerator.java | 20 +++--- .../contrib/util/POJOTupleGenerateOperator.java | 30 ++++----- .../com/datatorrent/contrib/util/TestPOJO.java | 22 +++---- .../contrib/util/TupleCacheOutputOperator.java | 14 ++-- .../util/TupleGenerateCacheOperator.java | 8 +-- .../contrib/util/TupleGenerator.java | 20 +++--- .../contrib/zmq/ZeroMQInputOperatorTest.java | 2 +- .../contrib/zmq/ZeroMQMessageGenerator.java | 6 +- .../contrib/zmq/ZeroMQMessageReceiver.java | 8 +-- .../contrib/zmq/ZeroMQOutputOperatorTest.java | 4 +- .../streamquery/FullOuterJoinOperatorTest.java | 6 +- .../misc/streamquery/GroupByOperatorTest.java | 2 +- .../misc/streamquery/HavingOperatorTest.java | 2 +- .../misc/streamquery/InnerJoinOperatorTest.java | 2 +- .../streamquery/LeftOuterJoinOperatorTest.java | 6 +- .../streamquery/RightOuterJoinOperatorTest.java | 8 +-- .../misc/streamquery/SelectTopOperatorTest.java | 6 +- .../advanced/BetweenConditionTest.java | 2 +- .../advanced/CompoundConditionTest.java | 2 +- .../streamquery/advanced/InConditionTest.java | 2 +- .../demos/machinedata/data/AverageData.java | 2 +- .../demos/machinedata/data/MachineInfo.java | 20 +++--- .../demos/mobile/PhoneEntryOperator.java | 10 +-- .../datatorrent/demos/pi/CalculatorTest.java | 2 +- .../hive/AbstractFSRollingOutputOperator.java | 2 +- .../datatorrent/contrib/hive/HiveOperator.java | 14 ++-- .../apex/malhar/hive/HiveOutputModule.java | 52 +++++++-------- .../malhar/kafka/AbstractKafkaPartitioner.java | 12 ++-- .../apex/malhar/kafka/KafkaConsumerWrapper.java | 4 +- .../malhar/kafka/KafkaInputOperatorTest.java | 44 ++++++------- .../apex/malhar/kafka/KafkaTestPartitioner.java | 2 +- .../apex/malhar/kafka/KafkaTestProducer.java | 8 +-- .../lib/appdata/gpo/SerdeMapPrimitive.java | 4 +- .../lib/appdata/query/WindowBoundedService.java | 2 +- .../schemas/DimensionalConfigurationSchema.java | 2 +- .../lib/appdata/schemas/DimensionalSchema.java | 14 ++-- .../datatorrent/lib/appdata/schemas/Schema.java | 2 +- .../lib/appdata/schemas/SnapshotSchema.java | 2 +- .../snapshot/AbstractAppDataSnapshotServer.java | 18 +++--- .../lib/bandwidth/BandwidthPartitioner.java | 4 +- .../com/datatorrent/lib/codec/package-info.java | 2 +- .../datatorrent/lib/converter/Converter.java | 4 +- .../com/datatorrent/lib/db/KeyValueStore.java | 2 +- .../db/jdbc/AbstractJdbcPOJOOutputOperator.java | 2 +- .../db/jdbc/AbstractJdbcPollInputOperator.java | 6 +- .../lib/db/jdbc/JdbcPollInputOperator.java | 2 +- .../datatorrent/lib/fileaccess/TFileImpl.java | 28 ++++---- .../datatorrent/lib/fileaccess/TFileWriter.java | 6 +- .../datatorrent/lib/filter/FilterOperator.java | 4 +- .../datatorrent/lib/formatter/Formatter.java | 6 +- .../lib/formatter/JsonFormatter.java | 2 +- .../datatorrent/lib/formatter/XmlFormatter.java | 12 ++-- .../lib/io/AbstractFTPInputOperator.java | 2 +- .../datatorrent/lib/io/block/BlockWriter.java | 8 +-- .../datatorrent/lib/io/block/FSSliceReader.java | 2 +- .../lib/io/fs/AbstractFileSplitter.java | 2 +- .../io/fs/AbstractSingleFileOutputOperator.java | 12 ++-- .../com/datatorrent/lib/io/fs/FileMerger.java | 2 +- .../com/datatorrent/lib/io/fs/FileStitcher.java | 26 ++++---- .../lib/io/fs/FilterStreamCodec.java | 4 +- .../lib/io/fs/FilterStreamContext.java | 2 +- .../lib/io/fs/FilterStreamProvider.java | 14 ++-- .../lib/io/fs/HDFSFileCopyModule.java | 12 ++-- .../datatorrent/lib/io/fs/HDFSFileMerger.java | 6 +- .../com/datatorrent/lib/io/fs/Synchronizer.java | 2 +- .../AbstractJMSSinglePortOutputOperator.java | 2 +- .../com/datatorrent/lib/io/jms/JMSBase.java | 8 +-- .../lib/math/AbstractAggregateCalc.java | 2 +- .../AbstractXmlKeyValueCartesianProduct.java | 4 +- .../java/com/datatorrent/lib/math/Division.java | 38 +++++------ .../java/com/datatorrent/lib/math/Margin.java | 2 +- .../com/datatorrent/lib/math/MarginMap.java | 6 +- .../main/java/com/datatorrent/lib/math/Min.java | 4 +- .../lib/math/MultiplyByConstant.java | 10 +-- .../datatorrent/lib/math/RunningAverage.java | 2 +- .../java/com/datatorrent/lib/math/Sigma.java | 2 +- .../math/SingleVariableAbstractCalculus.java | 2 +- .../main/java/com/datatorrent/lib/math/Sum.java | 2 +- .../math/XmlKeyValueStringCartesianProduct.java | 4 +- .../com/datatorrent/lib/math/package-info.java | 4 +- .../java/com/datatorrent/lib/parser/Parser.java | 6 +- .../com/datatorrent/lib/parser/XmlParser.java | 2 +- .../lib/projection/ProjectionOperator.java | 2 +- .../datatorrent/lib/script/ScriptOperator.java | 8 +-- .../lib/util/AbstractKeyValueStorageAgent.java | 20 +++--- .../lib/util/StorageAgentKeyValueStore.java | 10 +-- .../com/datatorrent/lib/util/TableInfo.java | 4 +- .../java/com/datatorrent/lib/util/TopNSort.java | 2 +- .../com/datatorrent/lib/util/package-info.java | 2 +- .../malhar/lib/dedup/BoundedDedupOperator.java | 14 ++-- .../aggregator/AbstractCompositeAggregator.java | 14 ++-- .../AbstractCompositeAggregatorFactory.java | 2 +- .../AbstractIncrementalAggregator.java | 2 +- .../aggregator/AbstractTopBottomAggregator.java | 41 ++++++------ .../aggregator/AggregatorRegistry.java | 24 +++---- .../dimensions/aggregator/AggregatorUtils.java | 2 +- .../aggregator/CompositeAggregator.java | 10 +-- .../aggregator/CompositeAggregatorFactory.java | 4 +- .../DefaultCompositeAggregatorFactory.java | 8 +-- .../aggregator/TopBottomAggregatorFactory.java | 14 ++-- .../apex/malhar/lib/fs/FSRecordReader.java | 10 +-- .../malhar/lib/fs/FSRecordReaderModule.java | 22 +++---- .../managed/IncrementalCheckpointManager.java | 2 +- .../state/spillable/TimeBasedPriorityQueue.java | 6 +- .../malhar/lib/wal/FSWindowDataManager.java | 12 ++-- .../apex/malhar/lib/wal/FSWindowReplayWAL.java | 4 +- .../apex/malhar/lib/wal/FileSystemWAL.java | 2 +- .../malhar/lib/window/accumulation/Average.java | 8 +-- .../malhar/lib/window/accumulation/Group.java | 8 +-- .../malhar/lib/window/accumulation/Max.java | 14 ++-- .../malhar/lib/window/accumulation/Min.java | 14 ++-- .../window/accumulation/RemoveDuplicates.java | 8 +-- .../lib/window/accumulation/SumDouble.java | 8 +-- .../lib/window/accumulation/SumFloat.java | 8 +-- .../malhar/lib/window/accumulation/SumInt.java | 8 +-- .../malhar/lib/window/accumulation/SumLong.java | 8 +-- .../apache/hadoop/io/file/tfile/DTBCFile.java | 68 ++++++++++---------- .../tfile/ReusableByteArrayInputStream.java | 8 +-- .../lib/algo/BottomNUnifierTest.java | 6 +- .../MapToKeyValuePairConverterTest.java | 14 ++-- .../StringValueToNumberConverterForMapTest.java | 14 ++-- .../lib/db/cache/CacheStoreTest.java | 4 +- .../datatorrent/lib/db/jdbc/JdbcIOAppTest.java | 2 +- .../lib/db/jdbc/JdbcOperatorTest.java | 6 +- .../com/datatorrent/lib/filter/FilterTest.java | 8 +-- .../lib/formatter/XmlFormatterTest.java | 2 +- .../io/fs/AbstractFileInputOperatorTest.java | 2 +- .../io/fs/AbstractFileOutputOperatorTest.java | 24 +++---- .../AbstractSingleFileOutputOperatorTest.java | 2 +- .../lib/io/fs/FastMergerDecisionMakerTest.java | 12 ++-- .../lib/io/fs/FileSplitterInputTest.java | 24 +++---- .../datatorrent/lib/io/fs/SynchronizerTest.java | 2 +- .../lib/io/fs/TailFsInputOperatorTest.java | 4 +- .../logs/FilteredLineToTokenHashMapTest.java | 2 +- .../lib/logs/LineToTokenArrayListTest.java | 4 +- .../lib/logs/LineToTokenHashMapTest.java | 2 +- .../lib/logs/LineTokenizerKeyValTest.java | 2 +- .../MultiWindowDimensionAggregationTest.java | 2 +- .../com/datatorrent/lib/math/MarginMapTest.java | 4 +- .../com/datatorrent/lib/math/SigmaTest.java | 2 +- .../lib/multiwindow/SortedMovingWindowTest.java | 32 ++++----- .../datatorrent/lib/parser/XmlParserTest.java | 2 +- .../lib/statistics/MeridianOperatorTest.java | 2 +- .../lib/statistics/ModeOperatorTest.java | 2 +- .../StandardDeviationOperatorTest.java | 4 +- .../statistics/WeightedMeanOperatorTest.java | 2 +- .../lib/stream/DevNullCounterTest.java | 4 +- .../com/datatorrent/lib/stream/DevNullTest.java | 2 +- .../lib/testbench/ActiveMQMessageGenerator.java | 2 +- .../lib/testbench/RandomEventGeneratorTest.java | 2 +- .../com/datatorrent/lib/util/TestUtils.java | 2 +- .../lib/dedup/DeduperPartitioningTest.java | 4 +- .../apex/malhar/lib/fs/FSRecordReaderTest.java | 4 +- .../lib/fs/GenericFileOutputOperatorTest.java | 6 +- .../SpillableByteArrayListMultimapImplTest.java | 6 +- .../SpillableComplexComponentImplTest.java | 2 +- .../spillable/SpillableSetMultimapImplTest.java | 6 +- .../malhar/lib/wal/FSWindowDataManagerTest.java | 14 ++-- .../lib/window/accumulation/AverageTest.java | 2 +- .../lib/window/accumulation/FoldFnTest.java | 26 ++++---- .../lib/window/accumulation/GroupTest.java | 2 +- .../malhar/lib/window/accumulation/MaxTest.java | 6 +- .../malhar/lib/window/accumulation/MinTest.java | 6 +- .../lib/window/accumulation/ReduceFnTest.java | 6 +- .../accumulation/RemoveDuplicatesTest.java | 2 +- .../malhar/lib/window/accumulation/SumTest.java | 8 +-- .../lib/window/accumulation/TopNByKeyTest.java | 26 ++++---- .../apache/hadoop/io/file/tfile/TestDTFile.java | 4 +- .../io/file/tfile/TestDTFileByteArrays.java | 4 +- .../io/file/tfile/TestTFileComparator2.java | 8 +-- .../io/file/tfile/TestTFileComparators.java | 4 +- .../TestTFileJClassComparatorByteArrays.java | 6 +- .../file/tfile/TestTFileLzoCodecsStreams.java | 2 +- ...ileNoneCodecsJClassComparatorByteArrays.java | 4 +- .../hadoop/io/file/tfile/TestTFileSeek.java | 22 +++---- .../file/tfile/TestTFileSeqFileComparison.java | 2 +- .../hadoop/io/file/tfile/TestTFileSplit.java | 12 ++-- .../hadoop/io/file/tfile/TestTFileStreams.java | 6 +- .../FunctionOperator/FunctionOperatorTest.java | 12 ++-- 261 files changed, 1092 insertions(+), 1091 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java ---------------------------------------------------------------------- diff --git a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java index 1b94532..29c92e6 100644 --- a/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java +++ b/apps/logstream/src/main/java/com/datatorrent/apps/logstream/LogstreamWidgetOutputOperator.java @@ -128,7 +128,7 @@ public class LogstreamWidgetOutputOperator extends WidgetOutputOperator { @SuppressWarnings("unchecked") HashMap<String, Object>[] result = (HashMap<String, Object>[])Array.newInstance(HashMap.class, topNMap.size()); - + int j = 0; for (Entry<String, Number> e : topNMap.entrySet()) { result[j] = new HashMap<String, Object>(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java index 1a24984..f789d08 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/CouchBaseAppOutput.java @@ -29,7 +29,7 @@ import com.datatorrent.api.annotation.ApplicationAnnotation; * * Application to benchmark the performance of couchbase output operator. * The number of tuples processed per second were around 20,000. - * + * * @since 2.0.0 */ @ApplicationAnnotation(name = "CouchBaseAppOutput") http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java index c60e99a..1126ac1 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java @@ -55,7 +55,7 @@ public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<Be { private static final Logger logger = LoggerFactory.getLogger(BenchmarkPartitionableKafkaOutputOperator.class); - + private String topic = "benchmark"; @Min(1) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java index 7d9c3ba..eab02db 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkApp.java @@ -53,7 +53,7 @@ public class ManagedStateBenchmarkApp implements StreamingApplication protected StoreOperator storeOperator; protected int timeRange = 1000 * 60; // one minute range of hot keys - + @Override public void populateDAG(DAG dag, Configuration conf) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java ---------------------------------------------------------------------- diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java index 3298543..2748c29 100644 --- a/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java +++ b/benchmark/src/main/java/com/datatorrent/benchmark/state/StoreOperator.java @@ -48,7 +48,7 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo UPDATESYNC, UPDATEASYNC } - + protected static final int numOfWindowPerStatistics = 10; //this is the store we are going to use @@ -60,10 +60,10 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo protected long tupleCount = 0; protected int windowCountPerStatistics = 0; protected long statisticsBeginTime = 0; - + protected ExecMode execMode = ExecMode.INSERT; protected int timeRange = 1000 * 60; - + public final transient DefaultInputPort<KeyValPair<byte[], byte[]>> input = new DefaultInputPort<KeyValPair<byte[], byte[]>>() { @Override @@ -102,9 +102,9 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo protected transient Queue<Future<Slice>> taskQueue = new LinkedList<Future<Slice>>(); protected transient Map<Future<Slice>, KeyValPair<byte[], byte[]>> taskToPair = Maps.newHashMap(); - + /** - * we verify 3 type of operation + * we verify 3 type of operation * @param tuple */ protected void processTuple(KeyValPair<byte[], byte[]> tuple) @@ -119,21 +119,21 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo store.getSync(getTimeByKey(tuple.getKey()), new Slice(tuple.getKey())); insertValueToStore(tuple); break; - + default: //insert insertValueToStore(tuple); } } - + protected long getTimeByKey(byte[] key) { long lKey = ByteBuffer.wrap(key).getLong(); return lKey - (lKey % timeRange); } - + // give a barrier to avoid used up memory protected final int taskBarrier = 100000; - + /** * This method first send request of get to the state manager, then handle all the task(get) which already done and update the value. * @param tuple @@ -143,14 +143,14 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo if (taskQueue.size() > taskBarrier) { //slow down to avoid too much task waiting. try { - + logger.info("Queue Size: {}, wait time(milli-seconds): {}", taskQueue.size(), taskQueue.size() / taskBarrier); Thread.sleep(taskQueue.size() / taskBarrier); } catch (Exception e) { //ignore } } - + //send request of get to the state manager and add to the taskQueue and taskToPair. //the reason of an extra taskQueue to make sure the tasks are ordered { @@ -171,7 +171,7 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo insertValueToStore(taskToPair.remove(task)); } } - + protected void insertValueToStore(KeyValPair<byte[], byte[]> tuple) { Slice key = new Slice(tuple.getKey()); @@ -232,7 +232,7 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo { return execMode.name(); } - + public void setExecModeStr(String execModeStr) { //this method used for set configuration. so, use case-insensitive @@ -252,5 +252,5 @@ public class StoreOperator extends BaseOperator implements Operator.CheckpointNo { this.timeRange = timeRange; } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java ---------------------------------------------------------------------- diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java index 93a7720..4435aad 100644 --- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java +++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java @@ -39,31 +39,31 @@ import com.datatorrent.benchmark.state.StoreOperator.ExecMode; public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp { public static final String basePath = "target/temp"; - + @Before public void before() { FileUtil.fullyDelete(new File(basePath)); } - + @Test public void testUpdateSync() throws Exception { test(ExecMode.UPDATESYNC); } - + @Test public void testUpdateAsync() throws Exception { test(ExecMode.UPDATEASYNC); } - + @Test public void testInsert() throws Exception { test(ExecMode.INSERT); } - + public void test(ExecMode exeMode) throws Exception { Configuration conf = new Configuration(false); @@ -73,7 +73,7 @@ public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp super.populateDAG(dag, conf); storeOperator.execMode = exeMode; - + StreamingApplication app = new StreamingApplication() { @Override @@ -92,7 +92,7 @@ public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp } - + @Override public String getStoreBasePath(Configuration conf) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java index b03e31a..01e99d3 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java @@ -50,7 +50,7 @@ import com.datatorrent.lib.io.fs.AbstractFileInputOperator; * input file<br> * Users can add the {@link FSWindowDataManager} * to ensure exactly once semantics with a HDFS backed WAL. - * + * * @displayName AvroFileInputOperator * @category Input * @tags fs, file,avro, input operator @@ -81,7 +81,7 @@ public class AvroFileInputOperator extends AbstractFileInputOperator<GenericReco /** * Returns a input stream given a file path - * + * * @param path * @return InputStream * @throws IOException @@ -101,7 +101,7 @@ public class AvroFileInputOperator extends AbstractFileInputOperator<GenericReco /** * Reads a GenericRecord from the given input stream<br> * Emits the FileName,Offset,Exception on the error port if its connected - * + * * @return GenericRecord */ @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java index ad54491..1951c1e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java @@ -140,7 +140,7 @@ public class AvroToPojo extends BaseOperator /** * Returns a POJO from a Generic Record - * + * * @return Object */ @SuppressWarnings("unchecked") @@ -220,7 +220,7 @@ public class AvroToPojo extends BaseOperator /** * Use reflection to generate field info values if the user has not provided * the inputs mapping - * + * * @return String representing the POJO field to Avro field mapping */ private String generateFieldInfoInputs(Class<?> cls) @@ -240,7 +240,7 @@ public class AvroToPojo extends BaseOperator /** * Creates a map representing fieldName in POJO:field in Generic Record:Data * type - * + * * @return List of FieldInfo */ private List<FieldInfo> createFieldInfoMap(String str) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java index 5fd7ee2..2f8fb19 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/PojoToAvro.java @@ -97,7 +97,7 @@ public class PojoToAvro extends BaseOperator /** * Returns the schema string for Avro Generic Record - * + * * @return schemaString */ public String getSchemaString() @@ -115,7 +115,7 @@ public class PojoToAvro extends BaseOperator /** * Returns the schema object - * + * * @return schema */ private Schema getSchema() @@ -133,7 +133,7 @@ public class PojoToAvro extends BaseOperator /** * Returns the list for field names from provided Avro schema - * + * * @return List of Fields */ private List<Field> getColumnNames() @@ -151,7 +151,7 @@ public class PojoToAvro extends BaseOperator /** * This method generates the getters for provided field of a given class - * + * * @return Getter */ private Getter<?, ?> generateGettersForField(Class<?> cls, String inputFieldName) @@ -232,7 +232,7 @@ public class PojoToAvro extends BaseOperator /** * Returns a generic record mapping the POJO fields to provided schema - * + * * @return Generic Record */ private GenericRecord getGenericRecord(Object tuple) throws Exception http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java index 7753108..0282ae8 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/AbstractElasticSearchOutputOperator.java @@ -57,7 +57,7 @@ import com.datatorrent.lib.db.AbstractStoreOutputOperator; * @displayName Elastic Search Output * @category Output * @tags elastic search - * + * * @since 2.1.0 */ public abstract class AbstractElasticSearchOutputOperator<T, S extends ElasticSearchConnectable> extends AbstractStoreOutputOperator<T, S> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java index fdf4e62..34eca95 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java +++ b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchConnectable.java @@ -90,7 +90,7 @@ public class ElasticSearchConnectable implements Connectable /* * (non-Javadoc) - * + * * @see com.datatorrent.lib.db.Connectable#connect() */ @Override @@ -102,7 +102,7 @@ public class ElasticSearchConnectable implements Connectable /* * (non-Javadoc) - * + * * @see com.datatorrent.lib.db.Connectable#disconnect() */ @Override @@ -115,7 +115,7 @@ public class ElasticSearchConnectable implements Connectable /* * (non-Javadoc) - * + * * @see com.datatorrent.lib.db.Connectable#isConnected() */ @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java index 024b098..dcbee9d 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapInputOperator.java @@ -40,7 +40,7 @@ public abstract class ElasticSearchMapInputOperator<T extends Map<String, Object protected String type; /** - * + * */ public ElasticSearchMapInputOperator() { @@ -49,7 +49,7 @@ public abstract class ElasticSearchMapInputOperator<T extends Map<String, Object /** * {@link SearchRequestBuilder} properties which do not change for each window are set during operator initialization. - * + * * @see com.datatorrent.contrib.elasticsearch.AbstractElasticSearchInputOperator#setup(com.datatorrent.api.Context.OperatorContext) */ @Override @@ -61,7 +61,7 @@ public abstract class ElasticSearchMapInputOperator<T extends Map<String, Object /* * (non-Javadoc) - * + * * @see * com.datatorrent.contrib.elasticsearch.AbstractElasticSearchInputOperator#convertToTuple(org.elasticsearch.search * .SearchHit) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java index 51a4688..8616938 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchMapOutputOperator.java @@ -38,7 +38,7 @@ public class ElasticSearchMapOutputOperator<T extends Map<String, Object>> exten private String type; /** - * + * */ public ElasticSearchMapOutputOperator() { @@ -48,7 +48,7 @@ public class ElasticSearchMapOutputOperator<T extends Map<String, Object>> exten /* * (non-Javadoc) - * + * * @see * com.datatorrent.contrib.elasticsearch.AbstractElasticSearchOutputOperator#setSource(org.elasticsearch.action.index * .IndexRequestBuilder, java.lang.Object) @@ -61,7 +61,7 @@ public class ElasticSearchMapOutputOperator<T extends Map<String, Object>> exten /* * (non-Javadoc) - * + * * @see com.datatorrent.contrib.elasticsearch.AbstractElasticSearchOutputOperator#getId(java.lang.Object) */ @Override @@ -103,7 +103,7 @@ public class ElasticSearchMapOutputOperator<T extends Map<String, Object>> exten /* * (non-Javadoc) - * + * * @see com.datatorrent.contrib.elasticsearch.AbstractElasticSearchOutputOperator#getIndexName(java.lang.Object) */ @Override @@ -128,5 +128,5 @@ public class ElasticSearchMapOutputOperator<T extends Map<String, Object>> exten protected String getType(T tuple) { return type; - } + } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java index 7d88336..c13c025 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java +++ b/contrib/src/main/java/com/datatorrent/contrib/elasticsearch/ElasticSearchPercolatorStore.java @@ -28,7 +28,7 @@ import org.elasticsearch.index.query.QueryBuilder; import com.datatorrent.netlet.util.DTThrowable; /** - * + * * @since 2.1.0 */ public class ElasticSearchPercolatorStore extends ElasticSearchConnectable @@ -44,7 +44,7 @@ public class ElasticSearchPercolatorStore extends ElasticSearchConnectable public void registerPercolateQuery(String indexName, String queryName, QueryBuilder queryBuilder) { try { - + client.prepareIndex(indexName, PERCOLATOR_TYPE, queryName) .setSource(XContentFactory.jsonBuilder() .startObject() @@ -52,22 +52,22 @@ public class ElasticSearchPercolatorStore extends ElasticSearchConnectable .endObject()) .setRefresh(true) .execute().actionGet(); - + } catch (IOException e) { DTThrowable.rethrow(e); } } - + public PercolateResponse percolate(String[] indexNames, String documentType, Object tuple){ XContentBuilder docBuilder; try { - + docBuilder = XContentFactory.jsonBuilder().startObject(); docBuilder.field("doc").startObject(); //This is needed to designate the document docBuilder.field("content", tuple); docBuilder.endObject(); //End of the doc field docBuilder.endObject();//End of the JSON root object - + return client.preparePercolate().setIndices(indexNames) .setDocumentType(documentType) .setSource(docBuilder) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java index 9fa7129..3121cf1 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/DelimitedFSLoader.java @@ -146,7 +146,7 @@ public class DelimitedFSLoader extends FSLoader /** * Get the schema - * + * * @return */ public String getSchema() @@ -156,7 +156,7 @@ public class DelimitedFSLoader extends FSLoader /** * Set the schema - * + * * @param schema */ public void setSchema(String schema) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java index 997243d..464fa99 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FSLoader.java @@ -117,7 +117,7 @@ public abstract class FSLoader extends ReadOnlyBackup * a record. Concrete implementations override this method to parse a record * and convert it to Map of field names and values OR simply returns null to * skip the records. - * + * * @param line * A single record from file * @return a map with field name and value. Null value if returned is ignored http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java b/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java index 8b7eac0..d37ce3e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java +++ b/contrib/src/main/java/com/datatorrent/contrib/enrich/FixedWidthFSLoader.java @@ -94,7 +94,7 @@ public class FixedWidthFSLoader extends FSLoader /** * Set to true if file has header - * + * * @param hasHeader * Indicates whether first line of the file is a header. Default is * false @@ -106,7 +106,7 @@ public class FixedWidthFSLoader extends FSLoader /** * Gets the field description - * + * * @return fieldDescription. String specifying information related to fields * in fixed-width file. */ @@ -117,7 +117,7 @@ public class FixedWidthFSLoader extends FSLoader /** * Sets fieldDescription - * + * * @param fieldDescription * a String specifying information related to fields in fixed-width * file. Format is [NAME]:[FIELD_TYPE]:[WIDTH]:[date format if @@ -135,7 +135,7 @@ public class FixedWidthFSLoader extends FSLoader /** * Gets the character used for padding in the fixed-width file.Default is * white space (' ') - * + * * @return Padding character. Default is white space. */ public char getPadding() @@ -146,7 +146,7 @@ public class FixedWidthFSLoader extends FSLoader /** * Sets the character used for padding in fixed-width file.Default is white * space (' ') - * + * * @param padding * Padding character. Default is white space. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java index 34ba49c..2979b44 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java +++ b/contrib/src/main/java/com/datatorrent/contrib/formatter/CsvFormatter.java @@ -58,7 +58,7 @@ import com.datatorrent.netlet.util.DTThrowable; * <b>in</b>:input tuple as a POJO. Each tuple represents a record<br> * <b>out</b>:tuples are are converted to string are emitted on this port<br> * <b>err</b>:tuples that could not be converted are emitted on this port<br> - * + * * @displayName CsvFormatter * @category Formatter * @tags pojo csv formatter @@ -180,7 +180,7 @@ public class CsvFormatter extends Formatter<String> /** * Get the schema - * + * * @return schema */ public String getSchema() @@ -190,7 +190,7 @@ public class CsvFormatter extends Formatter<String> /** * Set the schema - * + * * @param schema */ public void setSchema(String schema) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java index bc8c1f0..497e6e4 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeInputOperator.java @@ -25,14 +25,14 @@ import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; * concrete operator should be created from this skeleton implementation. * <p> * </p> - * + * * @displayName Abstract Geode Input * @category Input * @tags geode, key value * * @param <T> * The tuple type. - * + * * * @since 3.4.0 */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java index 7b0e158..dd0bad2 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/AbstractGeodeOutputOperator.java @@ -25,14 +25,14 @@ import com.datatorrent.lib.db.AbstractStoreOutputOperator; * operator should be created from this skeleton implementation. * <p> * </p> - * + * * @displayName Abstract Geode Output * @category Output * @tags geode, key value * * @param <T> * The tuple type. - * + * * * @since 3.4.0 */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java index edd07d9..2152b97 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeCheckpointStore.java @@ -47,7 +47,7 @@ import java.util.Map.Entry; /** * Geode Store implementation of {@link StorageAgentKeyValueStore} Uses {@link Kryo} * serialization to store retrieve objects - * + * * * * @since 3.4.0 @@ -56,7 +56,7 @@ public class GeodeCheckpointStore implements StorageAgentKeyValueStore, Serializable { - public static final String GET_KEYS_QUERY = + public static final String GET_KEYS_QUERY = "SELECT entry.key FROM /$[region}.entries entry WHERE entry.key LIKE '${operator.id}%'"; private String geodeLocators; @@ -82,7 +82,7 @@ public class GeodeCheckpointStore /** * Initializes Geode store by using locator connection string - * + * * @param locatorString */ public GeodeCheckpointStore(String locatorString) @@ -101,7 +101,7 @@ public class GeodeCheckpointStore /** * Get the Geode locator connection string - * + * * @return locator connection string */ public String getGeodeLocators() @@ -111,7 +111,7 @@ public class GeodeCheckpointStore /** * Sets the Geode locator string - * + * * @param geodeLocators */ public void setGeodeLocators(String geodeLocators) @@ -160,7 +160,7 @@ public class GeodeCheckpointStore /** * Creates a region - * + * */ public synchronized void createRegion() { @@ -185,7 +185,7 @@ public class GeodeCheckpointStore /** * Check if store is connected to configured Geode cluster or not - * + * * @return True is connected to Geode cluster and client cache is active */ @Override @@ -199,7 +199,7 @@ public class GeodeCheckpointStore /** * Return the value for specified key from Geode region - * + * * @return the value object */ @Override @@ -252,7 +252,7 @@ public class GeodeCheckpointStore /** * Get list for keys starting from provided key name - * + * * @return List of keys */ @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java index fdfd4ce..691c2c1 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeKeyValueStorageAgent.java @@ -27,7 +27,7 @@ import com.datatorrent.lib.util.AbstractKeyValueStorageAgent; /** * Storage Agent implementation which uses {@link GeodeCheckpointStore} for operator * checkpointing - * + * * * * @since 3.4.0 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java index defaa54..c7d22c7 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodePOJOOutputOperator.java @@ -30,7 +30,7 @@ import com.datatorrent.lib.util.TableInfo; * @displayName Geode Output Operator * @category Output * @tags pojo, geode - * + * * * @since 3.4.0 */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java index bdb7add..d345661 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/GeodeStore.java @@ -51,14 +51,14 @@ import com.datatorrent.lib.db.KeyValueStore; * that provides reliable asynchronous event notifications and guaranteed message delivery. * Geode is a data management platform that provides real-time * , consistent access to data-intensive applications. - * + * * * @since 3.4.0 */ public class GeodeStore implements KeyValueStore, Serializable { /** - * + * */ private static final long serialVersionUID = -5076452548893319967L; private static final Logger logger = LoggerFactory.getLogger(GeodeStore.class); @@ -198,7 +198,7 @@ public class GeodeStore implements KeyValueStore, Serializable try { return (getRegion().get(key)); } catch (IOException ex) { - throw new RuntimeException("Exception while getting the object", ex); + throw new RuntimeException("Exception while getting the object", ex); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java b/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java index bc808ad..9e948c4 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java +++ b/contrib/src/main/java/com/datatorrent/contrib/geode/RegionCreateFunction.java @@ -33,7 +33,7 @@ import com.gemstone.gemfire.cache.execute.FunctionContext; /** * Function to create region dynamically through client API - * + * * * @since 3.4.0 */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java index a43b893..6a34a91 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java +++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/HBaseFieldInfo.java @@ -29,11 +29,11 @@ import com.datatorrent.lib.util.FieldInfo; public class HBaseFieldInfo extends FieldInfo { private String familyName; - + public HBaseFieldInfo() { } - + public HBaseFieldInfo( String columnName, String columnExpression, SupportType type, String familyName ) { super( columnName, columnExpression, type ); @@ -49,7 +49,7 @@ public class HBaseFieldInfo extends FieldInfo { this.familyName = familyName; } - + public byte[] toBytes( Object value ) { final SupportType type = getType(); @@ -57,28 +57,28 @@ public class HBaseFieldInfo extends FieldInfo { case BOOLEAN: return Bytes.toBytes( (Boolean)value ); - + case SHORT: return Bytes.toBytes( (Short)value ); - + case INTEGER: return Bytes.toBytes( (Integer)value ); - + case LONG: return Bytes.toBytes( (Long)value ); - + case FLOAT: return Bytes.toBytes( (Float)value ); - + case DOUBLE: return Bytes.toBytes( (Double)value ); - + case STRING: return Bytes.toBytes( (String)value ); } throw new IllegalArgumentException( "Unsupported type: " + type ); } - + public Object toValue( byte[] bytes ) { final SupportType type = getType(); @@ -86,26 +86,26 @@ public class HBaseFieldInfo extends FieldInfo { case BOOLEAN: return Bytes.toBoolean( bytes ); - + case SHORT: return Bytes.toShort( bytes ); - + case INTEGER: return Bytes.toInt( bytes ); - + case LONG: return Bytes.toLong( bytes ); - + case FLOAT: return Bytes.toFloat( bytes ); - + case DOUBLE: return Bytes.toDouble( bytes ); - + case STRING: return Bytes.toString( bytes ); } throw new IllegalArgumentException( "Unsupported type: " + type ); } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index fc11bf7..1218f4a 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -202,7 +202,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem protected abstract void emitTuple(Message message); /** - * Concrete class derived from KafkaInputOpertor should implement this method if it wants to access kafka offset and partitionId along with kafka message. + * Concrete class derived from KafkaInputOpertor should implement this method if it wants to access kafka offset and partitionId along with kafka message. */ protected void emitTuple(KafkaConsumer.KafkaMessage message) { @@ -524,7 +524,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem Set<Integer> deletedOperators = Sets.newHashSet(); Collection<Partition<AbstractKafkaInputOperator<K>>> resultPartitions = partitions; boolean numPartitionsChanged = false; - + switch (strategy) { // For the 1 to 1 mapping The framework will create number of operator partitions based on kafka topic partitions @@ -617,7 +617,7 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem default: break; } - + if (numPartitionsChanged) { List<WindowDataManager> managers = windowDataManager.partition(resultPartitions.size(), deletedOperators); int i = 0; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java index 5b9c5ed..85cee56 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java @@ -123,11 +123,11 @@ public class HighlevelKafkaConsumer extends KafkaConsumer Properties config = new Properties(); config.putAll(consumerConfig); config.setProperty("zookeeper.connect", zookeeperMap.get(cluster).iterator().next()); - // create consumer connector will start a daemon thread to monitor the metadata change - // we want to start this thread until the operator is activated + // create consumer connector will start a daemon thread to monitor the metadata change + // we want to start this thread until the operator is activated standardConsumer.put(cluster, kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(config))); } - + Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); if (numStream == null || numStream.size() == 0) { @@ -232,5 +232,5 @@ public class HighlevelKafkaConsumer extends KafkaConsumer // offset is not useful for high-level kafka consumer throw new UnsupportedOperationException("Offset request is currently not supported for high-level consumer"); } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java index a86a205..9954eb3 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java @@ -26,7 +26,7 @@ import java.io.Serializable; public class KafkaPartition implements Serializable { protected static final String DEFAULT_CLUSTERID = "com.datatorrent.contrib.kafka.defaultcluster"; - + @SuppressWarnings("unused") private KafkaPartition() { @@ -46,15 +46,15 @@ public class KafkaPartition implements Serializable } /** - * + * */ private static final long serialVersionUID = 7556802229202221546L; - + private String clusterId; - + private int partitionId; - + private String topic; public String getClusterId() @@ -128,7 +128,7 @@ public class KafkaPartition implements Serializable { return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + partitionId + ", topic=" + topic + "]"; } - - - + + + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java index 5eb0575..0dee11e 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java @@ -33,11 +33,11 @@ public interface OffsetManager // /** - * + * * Load initial offsets for all kafka partition * <br> * The method is called at the first attempt of creating partitions and the return value is used as initial offset for simple consumer - * + * * @return Map of Kafka KafkaPartition as key and long offset as value */ public Map<KafkaPartition, Long> loadInitialOffsets(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java index fb89389..4db1d69 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java @@ -69,7 +69,7 @@ import kafka.message.MessageAndOffset; * <br> * * Load balance: <br> - * <li>The consumer create several data-consuming threads to consume the data from broker(s)</li> + * <li>The consumer create several data-consuming threads to consume the data from broker(s)</li> * <li>Each thread has only ONE kafka client connecting to ONE broker to consume data from for multiple partitions </li> * <li> * There is ONE separate thread to monitor the leadership for all the partitions of the topic at every @@ -89,7 +89,7 @@ public class SimpleKafkaConsumer extends KafkaConsumer { /** - * The data-consuming thread that use one simple kafka client to connect to one broker which is the leader of the partition(s) that this consumer is interested + * The data-consuming thread that use one simple kafka client to connect to one broker which is the leader of the partition(s) that this consumer is interested */ static final class ConsumerThread implements Runnable { @@ -161,7 +161,7 @@ public class SimpleKafkaConsumer extends KafkaConsumer KafkaPartition kafkaPartition = iterator.next(); short errorCode = fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()); if (fetchResponse.hasError() && errorCode != ErrorMapping.NoError()) { - // Kick off partition(s) which has error when fetch from this broker temporarily + // Kick off partition(s) which has error when fetch from this broker temporarily // Monitor will find out which broker it goes in monitor thread logger.warn("Error when consuming topic {} from broker {} with error {} ", kafkaPartition, broker, ErrorMapping.exceptionFor(errorCode)); @@ -177,7 +177,7 @@ public class SimpleKafkaConsumer extends KafkaConsumer consumer.partitionToBroker.remove(kafkaPartition); consumer.stats.updatePartitionStats(kafkaPartition, -1, ""); continue; - } + } // If the fetchResponse either has no error or the no error for $kafkaPartition get the data long offset = -1l; for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kafkaPartition.getPartitionId())) { @@ -200,7 +200,7 @@ public class SimpleKafkaConsumer extends KafkaConsumer // Update consumer that these partitions are currently stop being consumed because of some unrecoverable exception consumer.partitionToBroker.remove(kpForConsumer); } - + logger.info("Exit the consumer thread for broker {} ", broker); } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java index 7be453a..43fc62a 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java @@ -69,14 +69,14 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator * @return */ protected abstract byte[] getRecord(V value); - + /** * convert tuple to pair of key and value. the key will be used as PartitionKey, and the value used as Data * @param tuple * @return */ protected abstract Pair<String, V> tupleToKeyValue(T tuple); - + List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<PutRecordsRequestEntry>(); // Max size of each record: 50KB, Max size of putRecords: 4.5MB // So, default capacity would be 4.5MB/50KB = 92 @@ -145,7 +145,7 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator { processTuple( tuple ); } - + }; public void processTuple(T tuple) @@ -169,15 +169,15 @@ public abstract class AbstractKinesisOutputOperator<V, T> implements Operator requestRecord.setData(ByteBuffer.wrap(getRecord(keyValue.second))); client.putRecord(requestRecord); - + } sendCount++; } catch (AmazonClientException e) { throw new RuntimeException(e); } } - - + + private void addRecord(T tuple) { try { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java b/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java index 973f1ee..1465f03 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java +++ b/contrib/src/main/java/com/datatorrent/contrib/memcache/MemcacheStore.java @@ -52,7 +52,7 @@ public class MemcacheStore implements KeyValueStore { serverAddresses.add(addr); } - + public List<InetSocketAddress> getServerAddresses() { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java b/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java index ee16786..e8f52df 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java +++ b/contrib/src/main/java/com/datatorrent/contrib/mqtt/MqttClientConfig.java @@ -294,7 +294,7 @@ public class MqttClientConfig /** * Sets the port - * + * * @param port the port */ public void setPort(int port) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java b/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java index 1be2f0d..c4eefff 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parquet/AbstractParquetFileReader.java @@ -92,7 +92,7 @@ public abstract class AbstractParquetFileReader<T> extends AbstractFileInputOper * Derived classes need to provide an implementation to convert a Parquet * Group to any other type. Each Parquet record is read as a <b>Group</b> * (parquet.example.data.Group) and is passed onto this method. - * + * * @param group * Parquet record represented as a Group * @return object of type T @@ -101,7 +101,7 @@ public abstract class AbstractParquetFileReader<T> extends AbstractFileInputOper /** * Get Parquet Schema as a String - * + * * @return parquetSchema Parquet Schema as a string. */ public String getParquetSchema() @@ -111,7 +111,7 @@ public abstract class AbstractParquetFileReader<T> extends AbstractFileInputOper /** * Set Parquet Schema as a String - * + * * @param parquetSchema * Parquet Schema as a string */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java b/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java index 8834c18..37bd60b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parquet/ParquetFilePOJOReader.java @@ -98,7 +98,7 @@ public class ParquetFilePOJOReader extends AbstractParquetFileReader<Object> * Converts a Parquet <b>Group</b>(parquet.example.data.Group) to a POJO. * Supported parquet primitive types are BOOLEAN, INT32, INT64, FLOAT, DOUBLE * and BINARY - * + * * @throws ParquetEncodingException * if group contains unsupported type */ @@ -167,7 +167,7 @@ public class ParquetFilePOJOReader extends AbstractParquetFileReader<Object> /** * Initializes {@link #activeFieldInfos} by adding fields represented by * fieldMapping - * + * * @param fieldMapping * String representing Parquet field name TO POJO field field name * mapping @@ -213,7 +213,7 @@ public class ParquetFilePOJOReader extends AbstractParquetFileReader<Object> /** * Returns String containing Parquet field name to POJO field name mapping - * + * * @return parquetToPOJOFieldsMapping String representing Parquet field name * TO POJO field name mapping */ @@ -230,7 +230,7 @@ public class ParquetFilePOJOReader extends AbstractParquetFileReader<Object> * long_id_v2: * LONG,css_file_loaded:css_file_loaded_v2:BOOLEAN,float_val:float_val_v2: * FLOAT,double_val:double_val_v2:DOUBLE - * + * * @param parquetToPOJOFieldsMapping * String representing Parquet field name TO POJO field name mapping */ @@ -261,7 +261,7 @@ public class ParquetFilePOJOReader extends AbstractParquetFileReader<Object> /** * Use reflection to generate field info values if the user has not provided * the inputs mapping. - * + * * @return String representing the Parquet field name to POJO field name * mapping */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java index bf8f85d..e7840aa 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CellProcessorBuilder.java @@ -50,7 +50,7 @@ import com.datatorrent.contrib.parser.DelimitedSchema.FieldType; * purpose and can be chained together with other processors to fully automate * all of the required conversions and constraint validation for a single * delimited record. - * + * * * @since 3.4.0 */ @@ -59,7 +59,7 @@ public class CellProcessorBuilder /** * Method to get cell processors for given field type and constraints - * + * * @param fieldType * data type of the field * @param constraints @@ -93,7 +93,7 @@ public class CellProcessorBuilder * Method to get cellprocessor for String with constraints. These constraints * are evaluated against the String field for which this cellprocessor is * defined. - * + * * @param constraints * map of constraints applicable to String * @return CellProcessor @@ -135,7 +135,7 @@ public class CellProcessorBuilder * Method to get cellprocessor for Integer with constraints. These constraints * are evaluated against the Integer field for which this cellprocessor is * defined. - * + * * @param constraints * map of constraints applicable to Integer * @return CellProcessor @@ -170,7 +170,7 @@ public class CellProcessorBuilder * Method to get cellprocessor for Long with constraints. These constraints * are evaluated against the Long field for which this cellprocessor is * defined. - * + * * @param constraints * map of constraints applicable to Long * @return CellProcessor @@ -204,7 +204,7 @@ public class CellProcessorBuilder * Method to get cellprocessor for Float/Double with constraints. These * constraints are evaluated against the Float/Double field for which this * cellprocessor is defined. - * + * * @param constraints * map of constraints applicable to Float/Double * @return CellProcessor @@ -238,7 +238,7 @@ public class CellProcessorBuilder * Method to get cellprocessor for Boolean with constraints. These constraints * are evaluated against the Boolean field for which this cellprocessor is * defined. - * + * * @param constraints * map of constraints applicable to Boolean * @return CellProcessor @@ -267,7 +267,7 @@ public class CellProcessorBuilder * Method to get cellprocessor for Date with constraints. These constraints * are evaluated against the Date field for which this cellprocessor is * defined. - * + * * @param constraints * map of constraints applicable to Date * @return CellProcessor @@ -291,7 +291,7 @@ public class CellProcessorBuilder * Method to get cellprocessor for Char with constraints. These constraints * are evaluated against the Char field for which this cellprocessor is * defined. - * + * * @param constraints * map of constraints applicable to Char * @return CellProcessor @@ -316,7 +316,7 @@ public class CellProcessorBuilder /** * Get a Double Min Max cellprocessor. - * + * * @param minValue * minimum value. * @param maxValue @@ -332,7 +332,7 @@ public class CellProcessorBuilder /** * Get a Long Min Max cellprocessor. - * + * * @param minValue * minimum value. * @param maxValue @@ -348,7 +348,7 @@ public class CellProcessorBuilder /** * Get a Int Min Max cellprocessor. - * + * * @param minValue * minimum value. * @param maxValue @@ -364,7 +364,7 @@ public class CellProcessorBuilder /** * Get Optional cellprocessor which means field is not mandatory. - * + * * @param cellProcessor * next processor in the chain. * @return CellProcessor @@ -379,7 +379,7 @@ public class CellProcessorBuilder /** * Get cellprocessor to parse String as Integer. - * + * * @param cellProcessor * next processor in the chain. * @return CellProcessor @@ -394,7 +394,7 @@ public class CellProcessorBuilder /** * Get cellprocessor to parse String as Long. - * + * * @param cellProcessor * next processor in the chain. * @return CellProcessor @@ -409,7 +409,7 @@ public class CellProcessorBuilder /** * Get cellprocessor to parse String as Double. - * + * * @param cellProcessor * next processor in the chain. * @return CellProcessor @@ -424,7 +424,7 @@ public class CellProcessorBuilder /** * Get cellprocessor to parse String as Character. - * + * * @param cellProcessor * next processor in the chain. * @return CellProcessor http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java index 4698821..ea406e9 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/CsvParser.java @@ -65,7 +65,7 @@ import com.datatorrent.netlet.util.DTThrowable; * <b>err</b>:tuples that do not confine to schema are emitted on this port as * KeyValPair<String,String><br> * Key being the tuple and Val being the reason. - * + * * @displayName CsvParser * @category Parsers * @tags csv pojo parser @@ -229,7 +229,7 @@ public class CsvParser extends Parser<byte[], KeyValPair<String, String>> /** * Get the schema - * + * * @return */ public String getSchema() @@ -239,7 +239,7 @@ public class CsvParser extends Parser<byte[], KeyValPair<String, String>> /** * Set the schema - * + * * @param schema */ public void setSchema(String schema) @@ -249,7 +249,7 @@ public class CsvParser extends Parser<byte[], KeyValPair<String, String>> /** * Get errorTupleCount - * + * * @return errorTupleCount */ @VisibleForTesting @@ -260,7 +260,7 @@ public class CsvParser extends Parser<byte[], KeyValPair<String, String>> /** * Get emittedObjectCount - * + * * @return emittedObjectCount */ @VisibleForTesting @@ -271,7 +271,7 @@ public class CsvParser extends Parser<byte[], KeyValPair<String, String>> /** * Get incomingTuplesCount - * + * * @return incomingTuplesCount */ @VisibleForTesting http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java index eb86c15..29b2c92 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/DelimitedSchema.java @@ -175,7 +175,7 @@ public class DelimitedSchema /** * For a given json string, this method sets the field members - * + * * @param json * @throws JSONException * @throws IOException @@ -208,7 +208,7 @@ public class DelimitedSchema /** * Get the list of field names mentioned in schema - * + * * @return fieldNames */ public List<String> getFieldNames() @@ -218,7 +218,7 @@ public class DelimitedSchema /** * Get the delimiter character - * + * * @return delimiterChar */ public int getDelimiterChar() @@ -228,7 +228,7 @@ public class DelimitedSchema /** * Get the quoteChar - * + * * @return quoteChar */ public char getQuoteChar() @@ -238,7 +238,7 @@ public class DelimitedSchema /** * Get the line delimiter - * + * * @return lineDelimiter */ public String getLineDelimiter() @@ -255,7 +255,7 @@ public class DelimitedSchema /** * Get the list of Fields. - * + * * @return fields */ public List<Field> getFields() @@ -266,7 +266,7 @@ public class DelimitedSchema /** * Objects of this class represents a particular field in the schema. Each * field has a name, type and a set of associated constraints. - * + * */ public class Field { @@ -291,7 +291,7 @@ public class DelimitedSchema /** * Get the name of the field - * + * * @return name */ public String getName() @@ -301,7 +301,7 @@ public class DelimitedSchema /** * Set the name of the field - * + * * @param name */ public void setName(String name) @@ -311,7 +311,7 @@ public class DelimitedSchema /** * Get {@link FieldType} - * + * * @return type */ public FieldType getType() @@ -321,7 +321,7 @@ public class DelimitedSchema /** * Set {@link FieldType} - * + * * @param type */ public void setType(FieldType type) @@ -331,7 +331,7 @@ public class DelimitedSchema /** * Get the map of constraints associated with the field - * + * * @return constraints */ public Map<String, Object> getConstraints() @@ -341,7 +341,7 @@ public class DelimitedSchema /** * Sets the map of constraints associated with the field - * + * * @param constraints */ public void setConstraints(Map<String, Object> constraints) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java index b6c3c4d..bb95f9c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java +++ b/contrib/src/main/java/com/datatorrent/contrib/parser/JsonParser.java @@ -66,8 +66,8 @@ import com.datatorrent.netlet.util.DTThrowable; * <b>err</b>:tuples that do not confine to schema are emitted on this port as * KeyValPair<String,String><br> * Key being the tuple and Val being the reason. - * - * + * + * * @displayName JsonParser * @category Parsers * @tags json pojo parser @@ -180,7 +180,7 @@ public class JsonParser extends Parser<byte[], KeyValPair<String, String>> /** * Get jsonSchema contents as a string to be used during validation - * + * * @return jsonSchema */ public String getJsonSchema() @@ -190,7 +190,7 @@ public class JsonParser extends Parser<byte[], KeyValPair<String, String>> /** * Sets jsonSchema to be used during validation - * + * * @param jsonSchema * schema as a string */ @@ -201,7 +201,7 @@ public class JsonParser extends Parser<byte[], KeyValPair<String, String>> /** * Get errorTupleCount - * + * * @return errorTupleCount */ @VisibleForTesting @@ -212,7 +212,7 @@ public class JsonParser extends Parser<byte[], KeyValPair<String, String>> /** * Get emittedObjectCount - * + * * @return emittedObjectCount */ @VisibleForTesting @@ -223,7 +223,7 @@ public class JsonParser extends Parser<byte[], KeyValPair<String, String>> /** * Get incomingTuplesCount - * + * * @return incomingTuplesCount */ @VisibleForTesting @@ -234,7 +234,7 @@ public class JsonParser extends Parser<byte[], KeyValPair<String, String>> /** * Set schema. - * + * * @param schema */ @VisibleForTesting http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java b/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java index a1a23b7..3de0055 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java +++ b/contrib/src/main/java/com/datatorrent/contrib/r/REngineConnectable.java @@ -30,7 +30,7 @@ import com.datatorrent.lib.db.Connectable; import com.datatorrent.netlet.util.DTThrowable; /** - * @since 2.1.0 + * @since 2.1.0 */ public class REngineConnectable implements Connectable { @@ -57,7 +57,7 @@ public class REngineConnectable implements Connectable /* * (non-Javadoc) - * + * * @see com.datatorrent.lib.db.Connectable#connect() */ @Override @@ -82,7 +82,7 @@ public class REngineConnectable implements Connectable /* * (non-Javadoc) - * + * * @see com.datatorrent.lib.db.Connectable#disconnect() */ @Override @@ -95,7 +95,7 @@ public class REngineConnectable implements Connectable /* * (non-Javadoc) - * + * * @see com.datatorrent.lib.db.Connectable#isConnected() */ @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java b/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java index b562641..e171336 100755 --- a/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java +++ b/contrib/src/main/java/com/datatorrent/contrib/r/RScript.java @@ -48,16 +48,16 @@ import com.datatorrent.netlet.util.DTThrowable; * 5. set the type of arguments being passed. This will be done in a Map <br> * 6. Send the data in the form of a tuple consisting of a key:value pair where, "key" represents the name of the * argument "value" represents the actual value of the argument. A map of all the arguments is created and passed as - * input. <br> <br> - * + * input. <br> <br> + * * The result will be returned on one of the output ports depending on the type of the return value. * <br> <br> - * + * * <b> Sample Usage Code : </b> oper is an object of type RScript. Create it by passing <br> < name of the R script with * path from classpath>, < name of the function to be invoked>, < name of the return variable>); * <br> <br> * Map<String, RScript.REXP_TYPE> argTypeMap = new HashMap<String, RScript.REXP_TYPE>(); <br> - * argTypeMap.put(< argument name>, RScript.< argument type in the form of REXP_TYPE>); <br> + * argTypeMap.put(< argument name>, RScript.< argument type in the form of REXP_TYPE>); <br> * argTypeMap.put(< argument name>, RScript.< argument type in the form of REXP_TYPE>); <br> * ...... <br> * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java ---------------------------------------------------------------------- diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java index 847602e..08157bc 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java @@ -101,7 +101,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements protected transient Channel channel; protected transient TracingConsumer tracingConsumer; protected transient String cTag; - + protected transient ArrayBlockingQueue<KeyValPair<Long,byte[]>> holdingBuffer; private WindowDataManager windowDataManager; protected final transient Map<Long, byte[]> currentWindowRecoveryState; @@ -109,7 +109,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements private transient final Set<Long> recoveredTags; private transient long currentWindowId; private transient int operatorContextId; - + public AbstractRabbitMQInputOperator() { currentWindowRecoveryState = new HashMap<Long, byte[]>(); @@ -118,7 +118,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements windowDataManager = new WindowDataManager.NoopWindowDataManager(); } - + /** * define a consumer which can asynchronously receive data, * and added to holdingBuffer @@ -162,7 +162,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements } return; } - + // Acknowledgements are sent at the end of the window after adding to idempotency manager pendingAck.add(tag); holdingBuffer.add(new KeyValPair<Long, byte[]>(tag, body)); @@ -196,7 +196,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements } @SuppressWarnings("unchecked") - private void replay(long windowId) { + private void replay(long windowId) { Map<Long, byte[]> recoveredData; try { recoveredData = (Map<Long, byte[]>)this.windowDataManager.retrieve(windowId); @@ -212,7 +212,7 @@ public abstract class AbstractRabbitMQInputOperator<T> implements } } - + @Override public void endWindow() { @@ -221,25 +221,25 @@ public abstract class AbstractRabbitMQInputOperator<T> implements KeyValPair<Long, byte[]> message; while ((message = holdingBuffer.poll()) != null) { currentWindowRecoveryState.put(message.getKey(), message.getValue()); - emitTuple(message.getValue()); + emitTuple(message.getValue()); } - + try { this.windowDataManager.save(currentWindowRecoveryState, currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } - + currentWindowRecoveryState.clear(); - + for (Long deliveryTag : pendingAck) { try { channel.basicAck(deliveryTag, false); - } catch (IOException e) { + } catch (IOException e) { DTThrowable.rethrow(e); } } - + pendingAck.clear(); } @@ -391,15 +391,15 @@ public abstract class AbstractRabbitMQInputOperator<T> implements { this.routingKey = routingKey; } - + public WindowDataManager getWindowDataManager() { return windowDataManager; } - + public void setWindowDataManager(WindowDataManager windowDataManager) { this.windowDataManager = windowDataManager; } - + }
