http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java ---------------------------------------------------------------------- diff --git a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java index 8e12fcc..21079d7 100644 --- a/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java +++ b/demos/pi/src/test/java/com/datatorrent/demos/pi/CalculatorTest.java @@ -30,7 +30,7 @@ public class CalculatorTest { @Test public void testSomeMethod() throws Exception - { + { LocalMode lma = LocalMode.newInstance(); Configuration conf = new Configuration(false); conf.addResource("dt-site-pilibrary.xml");
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java index 6360768..3c9c4da 100755 --- a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java +++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java @@ -210,7 +210,7 @@ public abstract class AbstractFSRollingOutputOperator<T> extends AbstractFileOut * written to. Example: If hive partitions are date='2014-12-12',country='USA' * then this method returns {"2014-12-12","USA"} The implementation is left to * the user. - * + * * @param tuple * A received tuple to be written to a hive partition. * @return ArrayList containing hive partition values. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java index 8e3b143..ed4ca85 100755 --- a/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java +++ b/hive/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java @@ -61,7 +61,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi /** * Hive store. - * + * * @deprecated use {@link AbstractStoreOutputOperator#store} instead */ @Deprecated @@ -226,7 +226,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi /** * Get the partition columns in hive to which data needs to be loaded. - * + * * @return List of Hive Partition Columns */ public ArrayList<String> getHivePartitionColumns() @@ -236,7 +236,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi /** * Set the hive partition columns to which data needs to be loaded. - * + * * @param hivePartitionColumns */ public void setHivePartitionColumns(ArrayList<String> hivePartitionColumns) @@ -246,7 +246,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi /** * Get the table name in hive. - * + * * @return table name */ public String getTablename() @@ -256,7 +256,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi /** * Set the table name in hive. - * + * * @param tablename */ public void setTablename(String tablename) @@ -266,7 +266,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi /** * Gets the store set for hive; - * + * * @deprecated use {@link #getStore()} instead. * @return hive store */ @@ -278,7 +278,7 @@ public class HiveOperator extends AbstractStoreOutputOperator<FilePartitionMappi /** * Set the store in hive. - * + * * @deprecated use {@link #setStore()} instead. * @param hivestore */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java ---------------------------------------------------------------------- diff --git a/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java index 3491b3c..d859634 100644 --- a/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java +++ b/hive/src/main/java/org/apache/apex/malhar/hive/HiveOutputModule.java @@ -164,7 +164,7 @@ public class HiveOutputModule implements Module /** * The path of the directory to where files are written. - * + * * @return file path */ public String getFilePath() @@ -174,7 +174,7 @@ public class HiveOutputModule implements Module /** * The path of the directory to where files are written. - * + * * @param filePath * file path */ @@ -185,7 +185,7 @@ public class HiveOutputModule implements Module /** * Names of the columns in hive table (excluding partitioning columns). - * + * * @return Hive column names */ public String[] getHiveColumns() @@ -195,7 +195,7 @@ public class HiveOutputModule implements Module /** * Names of the columns in hive table (excluding partitioning columns). - * + * * @param hiveColumns * Hive column names */ @@ -207,7 +207,7 @@ public class HiveOutputModule implements Module /** * Data types of the columns in hive table (excluding partitioning columns). * This sequence should match to the fields in hiveColumnDataTypes - * + * * @return Hive column data types */ public FIELD_TYPE[] getHiveColumnDataTypes() @@ -218,7 +218,7 @@ public class HiveOutputModule implements Module /** * Data types of the columns in hive table (excluding partitioning columns). * This sequence should match to the fields in hiveColumnDataTypes * - * + * * @param hiveColumnDataTypes * Hive column data types */ @@ -230,7 +230,7 @@ public class HiveOutputModule implements Module /** * Expressions for the hive columns (excluding partitioning columns). This * sequence should match to the fields in hiveColumnDataTypes - * + * * @return */ public String[] getExpressionsForHiveColumns() @@ -241,7 +241,7 @@ public class HiveOutputModule implements Module /** * Expressions for the hive columns (excluding partitioning columns). This * sequence should match to the fields in hiveColumnDataTypes - * + * * @param expressionsForHiveColumns */ public void setExpressionsForHiveColumns(String[] expressionsForHiveColumns) @@ -251,7 +251,7 @@ public class HiveOutputModule implements Module /** * Names of the columns on which hive data should be partitioned - * + * * @return hive partition columns */ public String[] getHivePartitionColumns() @@ -261,7 +261,7 @@ public class HiveOutputModule implements Module /** * Names of the columns on which hive data should be partitioned - * + * * @param hivePartitionColumns * Hive partition columns */ @@ -273,7 +273,7 @@ public class HiveOutputModule implements Module /** * Data types of the columns on which hive data should be partitioned. This * sequence should match to the fields in hivePartitionColumns - * + * * @return Hive partition column data types */ public FIELD_TYPE[] getHivePartitionColumnDataTypes() @@ -284,7 +284,7 @@ public class HiveOutputModule implements Module /** * Data types of the columns on which hive data should be partitioned. This * sequence should match to the fields in hivePartitionColumns - * + * * @param hivePartitionColumnDataTypes * Hive partition column data types */ @@ -296,7 +296,7 @@ public class HiveOutputModule implements Module /** * Expressions for the hive partition columns. This sequence should match to * the fields in hivePartitionColumns - * + * * @return Expressions for hive partition columns */ public String[] getExpressionsForHivePartitionColumns() @@ -307,7 +307,7 @@ public class HiveOutputModule implements Module /** * Expressions for the hive partition columns. This sequence should match to * the fields in hivePartitionColumns - * + * * @param expressionsForHivePartitionColumns * Expressions for hive partition columns */ @@ -318,7 +318,7 @@ public class HiveOutputModule implements Module /** * The maximum length in bytes of a rolling file. - * + * * @return maximum size of file */ public Long getMaxLength() @@ -328,7 +328,7 @@ public class HiveOutputModule implements Module /** * The maximum length in bytes of a rolling file. - * + * * @param maxLength * maximum size of file */ @@ -339,7 +339,7 @@ public class HiveOutputModule implements Module /** * Connection URL for connecting to hive. - * + * * @return database url */ public String getDatabaseUrl() @@ -349,7 +349,7 @@ public class HiveOutputModule implements Module /** * Connection URL for connecting to hive. - * + * * @param databaseUrl * database url */ @@ -360,7 +360,7 @@ public class HiveOutputModule implements Module /** * Driver for connecting to hive. - * + * * @return database driver */ public String getDatabaseDriver() @@ -370,7 +370,7 @@ public class HiveOutputModule implements Module /** * Driver for connecting to hive. - * + * * @param databaseDriver * database driver */ @@ -381,7 +381,7 @@ public class HiveOutputModule implements Module /** * Username for connecting to hive - * + * * @return user name */ public String getUserName() @@ -391,7 +391,7 @@ public class HiveOutputModule implements Module /** * Username for connecting to hive - * + * * @param username * user name */ @@ -402,7 +402,7 @@ public class HiveOutputModule implements Module /** * Password for connecting to hive - * + * * @return password */ public String getPassword() @@ -412,7 +412,7 @@ public class HiveOutputModule implements Module /** * Password for connecting to hive - * + * * @param password * password */ @@ -423,7 +423,7 @@ public class HiveOutputModule implements Module /** * Table name for writing data into hive - * + * * @return table name */ public String getTablename() @@ -433,7 +433,7 @@ public class HiveOutputModule implements Module /** * Table name for writing data into hive - * + * * @param tablename * table name */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java index 772399d..ad5c3fa 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java @@ -110,20 +110,20 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa metadata.get(clusters[i]).put(topic, ptis); break; } - + logger.warn("Partition metadata for topic {} is null. retrying...", topic); - + } catch (Exception e) { logger.warn("Got Exception when trying get partition info for topic {}.", topic, e); } - + try { Thread.sleep(100); } catch (Exception e1) { //ignore } } //end while - + if (tryTime == 0) { throw new RuntimeException("Get partition info for topic completely failed. Please check the log file. topic name: " + topic); } @@ -183,8 +183,8 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa } metadataRefreshClients = null; } - - + + @Override public void partitioned(Map<Integer, Partition<AbstractKafkaInputOperator>> map) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java ---------------------------------------------------------------------- diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java index 143a5bd..fa4856e 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java @@ -228,7 +228,7 @@ public class KafkaConsumerWrapper implements Closeable } } } - + protected void handleNoOffsetForPartitionException(NoOffsetForPartitionException e, KafkaConsumer<byte[], byte[]> consumer) { // if initialOffset is set to EARLIST or LATEST @@ -244,7 +244,7 @@ public class KafkaConsumerWrapper implements Closeable } else { consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0])); } - + } /** http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index 8440615..4e97d72 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -66,7 +66,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase private String partition = null; private String testName = ""; - + public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator; public class KafkaTestInfo extends TestWatcher @@ -86,11 +86,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase this.desc = description; } } - + @Rule public final KafkaTestInfo testInfo = new KafkaTestInfo(); - - + + @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}") public static Collection<Object[]> testScenario() { @@ -116,7 +116,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase tupleCollection.clear(); //reset count for next new test case k = 0; - + createTopic(0, testName); if (hasMultiCluster) { createTopic(1, testName); @@ -146,14 +146,14 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase private static final int failureTrigger = 3 * scale; private static final int tuplesPerWindow = 5 * scale; private static final int waitTime = 60000 + 300 * scale; - - //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed, + + //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed, //so, count valid tuple instead. private static CountDownLatch latch; private static boolean hasFailure = false; private static int k = 0; private static Thread monitorThread; - + /** * Test Operator to collect tuples from KafkaSingleInputStringOperator. * @@ -179,7 +179,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase transient List<String> windowTupleCollector = Lists.newArrayList(); private transient Map<String, List<String>> tupleCollectedInWindow = new HashMap<>(); private int endTuples = 0; - + @Override public void setup(Context.OperatorContext context) { @@ -196,7 +196,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase endTuples = 0; } - + public void processTuple(byte[] bt) { String tuple = new String(bt); @@ -208,10 +208,10 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) { endTuples++; } - + windowTupleCollector.add(tuple); } - + @Override public void endWindow() { @@ -231,7 +231,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase //discard the tuples of this window if except happened int tupleSize = windowTupleCollector.size(); tupleCollection.addAll(windowTupleCollector); - + int countDownTupleSize = countDownAll ? tupleSize : endTuples; if (latch != null) { @@ -303,8 +303,8 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase // each broker should get a END_TUPLE message latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers); - logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}", - testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition); + logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {}; hasMultiPartition: {}, partition: {}", + testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition); // Start producer KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster); @@ -313,7 +313,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase t.start(); int expectedReceiveCount = totalCount + totalBrokers; - + // Create DAG for testing. LocalMode lma = LocalMode.newInstance(); DAG dag = lma.getDAG(); @@ -346,7 +346,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase LocalMode.Controller lc = lma.getController(); lc.setHeartbeatMonitoringEnabled(false); - //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(), + //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(), //but Controller.runAsync() don't expose the thread which run it, so we don't know when the thread will be terminated. //create this thread and then call join() to make sure the Controller shutdown completely. monitorThread = new Thread((StramLocalCluster)lc, "master"); @@ -363,9 +363,9 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase } catch (Exception e) { logger.warn(e.getMessage()); } - + t.join(); - + if (!notTimeout || expectedReceiveCount != tupleCollection.size()) { logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(), expectedReceiveCount, testName, tupleCollection); @@ -373,13 +373,13 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: " + tupleCollection, notTimeout); // Check results - Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection, + Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection, expectedReceiveCount == tupleCollection.size()); - + logger.info("End of test case: {}", testName); } - + private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag) { operator.setHoldingBufferSize(5000); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java index e6256f1..21f8977 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java @@ -34,7 +34,7 @@ import kafka.utils.VerifiableProperties; public class KafkaTestPartitioner implements Partitioner { public KafkaTestPartitioner(VerifiableProperties props) { - + } public KafkaTestPartitioner() { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java ---------------------------------------------------------------------- diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java index 2f24a8a..ca6cc98 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java @@ -106,7 +106,7 @@ public class KafkaTestProducer implements Runnable } private transient List<Future<RecordMetadata>> sendTasks = Lists.newArrayList(); - + private void generateMessages() { // Create dummy message @@ -140,12 +140,12 @@ public class KafkaTestProducer implements Runnable sendTasks.add(producer.send(new ProducerRecord<>(topic, "", msg))); } } - + producer.flush(); if (producer1!=null) { producer1.flush(); } - + try { for (Future<RecordMetadata> task : sendTasks) { task.get(30, TimeUnit.SECONDS); @@ -153,7 +153,7 @@ public class KafkaTestProducer implements Runnable } catch (Exception e) { throw new RuntimeException(e); } - + close(); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java b/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java index cd61e20..8c8b83a 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/gpo/SerdeMapPrimitive.java @@ -74,7 +74,7 @@ public class SerdeMapPrimitive implements Serde GPOType gpoType = GPOType.GPO_TYPE_ARRAY[type.ordinal()]; bytes.add(gpoType.serialize(object)); } - + @Override public synchronized Object deserializeObject(byte[] objectBytes, MutableInt offset) { @@ -87,7 +87,7 @@ public class SerdeMapPrimitive implements Serde int typeOrdinal = GPOUtils.deserializeInt(objectBytes, offset); GPOType gpoType = GPOType.GPO_TYPE_ARRAY[typeOrdinal]; Object key = gpoType.deserialize(objectBytes, offset); - + typeOrdinal = GPOUtils.deserializeInt(objectBytes, offset); gpoType = GPOType.GPO_TYPE_ARRAY[typeOrdinal]; Object value = gpoType.deserialize(objectBytes, offset); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java index 83e8634..4d631c3 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/query/WindowBoundedService.java @@ -104,7 +104,7 @@ public class WindowBoundedService implements Component<OperatorContext> mutex.release(); executorThread.shutdown(); - + try { executorThread.awaitTermination(10000L + executeIntervalMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException ex) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java index 2333dbb..59625f9 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java @@ -2201,7 +2201,7 @@ public class DimensionalConfigurationSchema { return getDimensionsDescriptorIDToIncrementalAggregatorIDs(); } - + public List<IntArrayList> getDimensionsDescriptorIDToCompositeAggregatorIDs() { return dimensionsDescriptorIDToCompositeAggregatorIDs; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java index 6138971..4fef2df 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java @@ -100,7 +100,7 @@ public class DimensionalSchema implements Schema new Fields(Sets.newHashSet(FIELD_TIME_FROM, FIELD_TIME_TO))); public static final String FIELD_RESPONSE_DELAY_MILLS = "responseDelayMillis"; - + /** * The from value for the schema. Null if there is no from value. */ @@ -164,7 +164,7 @@ public class DimensionalSchema implements Schema private int schemaID = Schema.DEFAULT_SCHEMA_ID; protected long responseDelayMillis; - + /** * Constructor for serialization */ @@ -249,7 +249,7 @@ public class DimensionalSchema implements Schema long responseDelayMillis) { this(schemaStub, - configurationSchema, + configurationSchema, responseDelayMillis); this.schemaID = schemaID; } @@ -391,7 +391,7 @@ public class DimensionalSchema implements Schema schema.put(SnapshotSchema.FIELD_SCHEMA_TYPE, DimensionalSchema.SCHEMA_TYPE); schema.put(SnapshotSchema.FIELD_SCHEMA_VERSION, DimensionalSchema.SCHEMA_VERSION); - + //responseDelayMillis if (responseDelayMillis > 0) { schema.put(FIELD_RESPONSE_DELAY_MILLS, responseDelayMillis); @@ -459,10 +459,10 @@ public class DimensionalSchema implements Schema for (int combinationID = 0; combinationID < configurationSchema.getDimensionsDescriptorIDToKeys().size(); combinationID++) { - + //TODO: the auto-generated combination for computation of composite aggregator will be added. //should remove it. - + Fields fields = configurationSchema.getDimensionsDescriptorIDToKeys().get(combinationID); Map<String, Set<String>> fieldToAggregatorAdditionalValues = configurationSchema.getDimensionsDescriptorIDToFieldToAggregatorAdditionalValues().get(combinationID); @@ -515,7 +515,7 @@ public class DimensionalSchema implements Schema combination.put(DimensionalConfigurationSchema.FIELD_DIMENSIONS_ADDITIONAL_VALUES, additionalValueArray); } - + dimensions.put(combination); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java index 8260c81..4460b51 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/Schema.java @@ -34,7 +34,7 @@ public interface Schema public static final String FIELD_SCHEMA_KEYS = "schemaKeys"; public static final String FIELD_SCHEMA = "schema"; public static final String FIELD_SCHEMA_TAGS = "tags"; - + /** * The id of the schema. This is relevant for operators which support serving multiple schemas, * in which each schema will need a unique id. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java index 5010580..b1e6d36 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/SnapshotSchema.java @@ -245,7 +245,7 @@ public class SnapshotSchema implements Schema schemaJSON = schema.toString(); } - + /** * This is a helper method which sets the JSON that represents this schema. * @param schemaJSON The JSON that represents this schema. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java index 0b03e79..19e142b 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/snapshot/AbstractAppDataSnapshotServer.java @@ -107,9 +107,9 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper * The queryExecutor execute the query and return the result. */ protected QueryExecutor<Query, Void, MutableLong, Result> queryExecutor; - + private Set<String> tags; - + @AppData.QueryPort @InputPortFieldAnnotation(optional = true) public final transient DefaultInputPort<String> query = new DefaultInputPort<String>() @@ -120,7 +120,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper processQuery(queryJSON); } }; - + /** * process the query send. * provide this method to give sub class a chance to override. @@ -169,7 +169,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper currentData.add(gpoRow); } } - + /** * Create operator. */ @@ -199,11 +199,11 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper public void setup(OperatorContext context) { setupSchema(); - + schemaRegistry = new SchemaRegistrySingle(schema); //Setup for query processing setupQueryProcessor(); - + queryDeserializerFactory = new MessageDeserializerFactory(SchemaQuery.class, DataQuerySnapshot.class); queryDeserializerFactory.setContext(DataQuerySnapshot.class, schemaRegistry); @@ -228,7 +228,7 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper protected void setupQueryProcessor() { - queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor, + queryProcessor = QueryManagerSynchronous.newInstance(queryExecutor == null ? new SnapshotComputer() : queryExecutor, new AppDataWindowEndQueueManager<Query, Void>()); } @@ -378,6 +378,6 @@ public abstract class AbstractAppDataSnapshotServer<INPUT_EVENT> implements Oper { this.tags = tags; } - - + + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java index 2b503ed..9b9eb8d 100644 --- a/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java +++ b/library/src/main/java/com/datatorrent/lib/bandwidth/BandwidthPartitioner.java @@ -42,7 +42,7 @@ public class BandwidthPartitioner<T extends BandwidthLimitingOperator> extends S /** * This constructor is used to create the partitioner from a property. - * + * * @param value A string which is an integer of the number of partitions to create */ public BandwidthPartitioner(String value) @@ -52,7 +52,7 @@ public class BandwidthPartitioner<T extends BandwidthLimitingOperator> extends S /** * This creates a partitioner which creates partitonCount partitions. - * + * * @param partitionCount The number of partitions to create. */ public BandwidthPartitioner(int partitionCount) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/codec/package-info.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/codec/package-info.java b/library/src/main/java/com/datatorrent/lib/codec/package-info.java index ded8689..d876e3f 100644 --- a/library/src/main/java/com/datatorrent/lib/codec/package-info.java +++ b/library/src/main/java/com/datatorrent/lib/codec/package-info.java @@ -17,7 +17,7 @@ * under the License. */ /** - * Shared codec implementations. + * Shared codec implementations. */ @org.apache.hadoop.classification.InterfaceStability.Evolving package com.datatorrent.lib.codec; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/converter/Converter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/converter/Converter.java b/library/src/main/java/com/datatorrent/lib/converter/Converter.java index ef999e4..3799cd2 100644 --- a/library/src/main/java/com/datatorrent/lib/converter/Converter.java +++ b/library/src/main/java/com/datatorrent/lib/converter/Converter.java @@ -24,7 +24,7 @@ import org.apache.hadoop.classification.InterfaceStability; * Operators that are converting tuples from one format to another must * implement this interface. Eg. Parsers or formatters , that parse data of * certain format and convert them to another format. - * + * * @param <INPUT> * @param <OUTPUT> * @since 3.2.0 @@ -35,7 +35,7 @@ public interface Converter<INPUT, OUTPUT> /** * Provide the implementation for converting tuples from one format to the * other - * + * * @param tuple tuple of certain format * @return OUTPUT tuple of converted format */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java b/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java index 4211d3d..76759a4 100644 --- a/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java +++ b/library/src/main/java/com/datatorrent/lib/db/KeyValueStore.java @@ -62,7 +62,7 @@ public interface KeyValueStore extends Connectable /** * Removes the key and the value given the key - * + * * @param key */ public void remove(Object key); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java index 90111d8..38d44a0 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPOJOOutputOperator.java @@ -94,7 +94,7 @@ public abstract class AbstractJdbcPOJOOutputOperator extends AbstractJdbcTransac super(); columnFieldGetters = Lists.newArrayList(); } - + protected static class ActiveFieldInfo { final FieldInfo fieldInfo; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java index f9fb714..6bd5121 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/AbstractJdbcPollInputOperator.java @@ -79,11 +79,11 @@ import static org.jooq.impl.DSL.field; * partitions for fetching the existing data in the table. And an additional * single partition for polling additive data. Assumption is that there is an * ordered unique column using which range queries can be formed<br> - * + * * Only newly added data will be fetched by the polling jdbc partition, also * assumption is rows won't be added or deleted in middle during scan. - * - * + * + * * @displayName Jdbc Polling Input Operator * @category Input * @tags database, sql, jdbc, partitionable, idepotent, pollable http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java index d139379..9a76103 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPollInputOperator.java @@ -33,7 +33,7 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation; /** * A concrete implementation for {@link AbstractJdbcPollInputOperator} to * consume data from jdbc store and emit comma separated values <br> - * + * * @displayName Jdbc Polling Input Operator * @category Input * @tags database, sql, jdbc http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java index 7dfe4e9..f11f2bc 100644 --- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileImpl.java @@ -32,8 +32,8 @@ import org.apache.hadoop.io.file.tfile.TFile.Writer; /** * A TFile wrapper with FileAccess API * <ul> - * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li> - * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li> + * <li>{@link TFileImpl.DefaultTFileImpl} return default TFile {@link Reader} and {@link Writer} for IO operations</li> + * <li>{@link TFileImpl.DTFileImpl} return DTFile {@link org.apache.hadoop.io.file.tfile.DTFile.Reader}(which is faster than default TFile reader) and {@link Writer} for IO operations</li> * </ul> * * @since 2.0.0 @@ -44,16 +44,16 @@ public abstract class TFileImpl extends FileAccessFSImpl private int minBlockSize = 64 * 1024; private String compressName = TFile.COMPRESSION_NONE; - + private String comparator = "memcmp"; - + private int chunkSize = 1024 * 1024; - + private int inputBufferSize = 256 * 1024; - + private int outputBufferSize = 256 * 1024; - + private void setupConfig(Configuration conf) { conf.set("tfile.io.chunk.size", String.valueOf(chunkSize)); @@ -69,7 +69,7 @@ public abstract class TFileImpl extends FileAccessFSImpl setupConfig(fs.getConf()); return new TFileWriter(fsdos, minBlockSize, compressName, comparator, fs.getConf()); } - + public int getMinBlockSize() { return minBlockSize; @@ -140,13 +140,13 @@ public abstract class TFileImpl extends FileAccessFSImpl { this.outputBufferSize = outputBufferSize; } - + /** * Return {@link TFile} {@link Reader} */ public static class DefaultTFileImpl extends TFileImpl { - + @Override public FileReader getReader(long bucketKey, String fileName) throws IOException { @@ -155,15 +155,15 @@ public abstract class TFileImpl extends FileAccessFSImpl super.setupConfig(fs.getConf()); return new TFileReader(fsdis, fileLength, fs.getConf()); } - + } - + /** * Return {@link DTFile} {@link org.apache.hadoop.io.file.tfile.DTFile.Reader} */ public static class DTFileImpl extends TFileImpl { - + @Override public FileReader getReader(long bucketKey, String fileName) throws IOException { @@ -172,7 +172,7 @@ public abstract class TFileImpl extends FileAccessFSImpl super.setupConfig(fs.getConf()); return new DTFileReader(fsdis, fileLength, fs.getConf()); } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java index 7e9d544..da724d4 100644 --- a/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java +++ b/library/src/main/java/com/datatorrent/lib/fileaccess/TFileWriter.java @@ -34,15 +34,15 @@ import org.apache.hadoop.io.file.tfile.TFile.Writer; public final class TFileWriter implements FileAccess.FileWriter { private Writer writer; - + private FSDataOutputStream fsdos; - + public TFileWriter(FSDataOutputStream stream, int minBlockSize, String compressName, String comparator, Configuration conf) throws IOException { this.fsdos = stream; writer = new Writer(stream, minBlockSize, compressName, comparator, conf); - + } @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java index 2a54e0f..6fccf1e 100644 --- a/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java +++ b/library/src/main/java/com/datatorrent/lib/filter/FilterOperator.java @@ -56,7 +56,7 @@ import com.datatorrent.lib.util.PojoUtils; * - truePort emits POJOs meeting the given condition * - falsePort emits POJOs not meeting the given condition * - error port emits any error situation while evaluating expression - * + * * * @since 3.5.0 */ @@ -234,6 +234,6 @@ public class FilterOperator extends BaseOperator implements Operator.ActivationL { return expressionFunctions; } - + private static final Logger logger = LoggerFactory.getLogger(FilterOperator.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java index db8dbc4..ef3c304 100644 --- a/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java +++ b/library/src/main/java/com/datatorrent/lib/formatter/Formatter.java @@ -40,7 +40,7 @@ import com.datatorrent.lib.converter.Converter; * <b>err</b>: emits <Object> error port that emits input tuple that could * not be converted<br> * <br> - * + * * @displayName Parser * @tags parser converter * @param <OUTPUT> @@ -99,7 +99,7 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements Converte /** * Get the class that needs to be formatted - * + * * @return Class<?> */ public Class<?> getClazz() @@ -109,7 +109,7 @@ public abstract class Formatter<OUTPUT> extends BaseOperator implements Converte /** * Set the class of tuple that needs to be formatted - * + * * @param clazz */ public void setClazz(Class<?> clazz) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java index 840b550..a784f89 100644 --- a/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java +++ b/library/src/main/java/com/datatorrent/lib/formatter/JsonFormatter.java @@ -29,7 +29,7 @@ import com.datatorrent.api.Context.OperatorContext; /** * Operator that converts POJO to JSON string <br> - * + * * @displayName JsonFormatter * @category Formatter * @tags pojo json formatter http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java index 21a7b6a..78dc344 100644 --- a/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java +++ b/library/src/main/java/com/datatorrent/lib/formatter/XmlFormatter.java @@ -88,7 +88,7 @@ public class XmlFormatter extends Formatter<String> * Gets the alias This is an optional step. Without it XStream would work * fine, but the XML element names would contain the fully qualified name of * each class (including package) which would bulk up the XML a bit. - * + * * @return alias. */ public String getAlias() @@ -100,7 +100,7 @@ public class XmlFormatter extends Formatter<String> * Sets the alias This is an optional step. Without it XStream would work * fine, but the XML element names would contain the fully qualified name of * each class (including package) which would bulk up the XML a bit. - * + * * @param alias * . */ @@ -112,7 +112,7 @@ public class XmlFormatter extends Formatter<String> /** * Gets the date format e.g dd/mm/yyyy - this will be how a date would be * formatted - * + * * @return dateFormat. */ public String getDateFormat() @@ -123,7 +123,7 @@ public class XmlFormatter extends Formatter<String> /** * Sets the date format e.g dd/mm/yyyy - this will be how a date would be * formatted - * + * * @param dateFormat * . */ @@ -134,7 +134,7 @@ public class XmlFormatter extends Formatter<String> /** * Returns true if pretty print is enabled. - * + * * @return prettyPrint */ public boolean isPrettyPrint() @@ -144,7 +144,7 @@ public class XmlFormatter extends Formatter<String> /** * Sets pretty print option. - * + * * @param prettyPrint */ public void setPrettyPrint(boolean prettyPrint) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java index 1401100..16d220c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/AbstractFTPInputOperator.java @@ -174,7 +174,7 @@ public abstract class AbstractFTPInputOperator<T> extends AbstractFileInputOpera /** * An {@link AbstractFTPInputOperator} that splits file into lines and emits them. - * + * * @displayName FTP String Input * @category Input * @tags ftp http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java b/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java index 9e18e1b..64c066b 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/BlockWriter.java @@ -59,7 +59,7 @@ public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader. * Directory under application directory where blocks gets stored */ private String blocksDirectory = DEFAULT_BLOCKS_DIR; - + /** * List of FileBlockMetadata received in the current window. */ @@ -206,7 +206,7 @@ public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader. } } } - + /** * Directory under application directory where blocks gets stored * @return blocks directory @@ -215,7 +215,7 @@ public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader. { return blocksDirectory; } - + /** * Directory under application directory where blocks gets stored * @param blocksDirectory blocks directory @@ -230,7 +230,7 @@ public class BlockWriter extends AbstractFileOutputOperator<AbstractBlockReader. { } - + private static final Logger LOG = LoggerFactory.getLogger(BlockWriter.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java index ad55358..60fd93c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java +++ b/library/src/main/java/com/datatorrent/lib/io/block/FSSliceReader.java @@ -26,7 +26,7 @@ import com.datatorrent.netlet.util.Slice; * * @category Input * @tags fs - * + * * @since 2.1.0 */ @StatsListener.DataQueueSize http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java index 7e6bd2f..38c8e96 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileSplitter.java @@ -389,7 +389,7 @@ public abstract class AbstractFileSplitter extends BaseOperator this.filePath = filePath; discoverTime = System.currentTimeMillis(); } - + protected FileMetadata(FileMetadata fileMetadata) { this(); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java index 3b60d4a..99a8fb6 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperator.java @@ -44,8 +44,8 @@ public abstract class AbstractSingleFileOutputOperator<INPUT> extends AbstractFi protected String outputFileName; /** - * partitionedFileName string format specifier - e.g. fileName_physicalPartionId -> %s_%d + * partitionedFileName string format specifier + e.g. fileName_physicalPartionId -> %s_%d */ private String partitionedFileNameformat = "%s_%d"; @@ -105,17 +105,17 @@ public abstract class AbstractSingleFileOutputOperator<INPUT> extends AbstractFi { return partitionedFileNameformat; } - + /** * @param partitionedFileNameformat * string format specifier for the partitioned file name. It should have one %s and one %d. - * e.g. fileName_physicalPartionId -> %s_%d + * e.g. fileName_physicalPartionId -> %s_%d */ public void setPartitionedFileNameformat(String partitionedFileNameformat) { this.partitionedFileNameformat = partitionedFileNameformat; } - + /** * @return * Derived name for file based on physicalPartitionId @@ -124,5 +124,5 @@ public abstract class AbstractSingleFileOutputOperator<INPUT> extends AbstractFi { return partitionedFileName; } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java index 04aa8cf..db7d7c5 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileMerger.java @@ -118,7 +118,7 @@ public class FileMerger extends FileStitcher<OutputFileMetadata> OutputStream outputStream = outputFS.create(partFilePath); return outputStream; } - + /** * Flag to control if existing file with same name should be overwritten * @return Flag to control if existing file with same name should be overwritten http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java index 5f5c717..c4ad9d3 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileStitcher.java @@ -48,7 +48,7 @@ import com.datatorrent.lib.io.fs.Synchronizer.StitchedFileMetaData; * This is generic File Stitcher which can be used to merge data from one or * more files into single stitched file. StitchedFileMetaData defines * constituents of the stitched file. - * + * * This class uses Reconciler to * * @since 3.4.0 @@ -75,7 +75,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc * Path for blocks directory */ protected transient String blocksDirectoryPath; - + /** * Directory under application directory where blocks gets stored */ @@ -133,8 +133,8 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc super.setup(context); // Calling it at the end as the reconciler thread uses resources allocated above. } - /* - * Calls super.endWindow() and sets counters + /* + * Calls super.endWindow() and sets counters * @see com.datatorrent.api.BaseOperator#endWindow() */ @Override @@ -146,7 +146,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc stitchedFileMetaData = doneTuples.peek(); // If a tuple is present in doneTuples, it has to be also present in successful/failed/skipped // as processCommittedData adds tuple in successful/failed/skipped - // and then reconciler thread add that in doneTuples + // and then reconciler thread add that in doneTuples if (successfulFiles.contains(stitchedFileMetaData)) { successfulFiles.remove(stitchedFileMetaData); LOG.debug("File copy successful: {}", stitchedFileMetaData.getStitchedFileRelativePath()); @@ -167,7 +167,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc } /** - * + * * @return Application FileSystem instance * @throws IOException */ @@ -177,7 +177,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc } /** - * + * * @return Destination FileSystem instance * @throws IOException */ @@ -240,7 +240,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc /** * Read data from block files and write to output file. Information about * which block files should be read is specified in outFileMetadata - * + * * @param stitchedFileMetaData * @throws IOException */ @@ -287,7 +287,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc /** * Writing all Stitch blocks to temporary file - * + * * @param stitchedFileMetaData * @throws IOException * @throws BlockNotFoundException @@ -312,7 +312,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc /** * Moving temp output file to final file - * + * * @param stitchedFileMetaData * @throws IOException */ @@ -324,7 +324,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc /** * Moving temp output file to final file - * + * * @param tempOutFilePath * Temporary output file * @param destination @@ -351,7 +351,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc throw new RuntimeException("Unable to move file from " + src + " to " + dst); } } - + /** * Directory under application directory where blocks gets stored * @return blocks directory @@ -360,7 +360,7 @@ public class FileStitcher<T extends StitchedFileMetaData> extends AbstractReconc { return blocksDirectory; } - + /** * Directory under application directory where blocks gets stored * @param blocksDirectory blocks directory http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java index 1029dff..baf2297 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamCodec.java @@ -41,7 +41,7 @@ public class FilterStreamCodec { filterStream = new GZIPOutputStream(outputStream); } - + @Override public void finalizeContext() throws IOException { @@ -80,7 +80,7 @@ public class FilterStreamCodec } /** - * This provider is useful when writing to a single output stream so that the same cipher can be reused + * This provider is useful when writing to a single output stream so that the same cipher can be reused */ public static class CipherSimpleStreamProvider implements FilterStreamProvider<CipherOutputStream, OutputStream> { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java index 35530a3..dd0393a 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamContext.java @@ -61,7 +61,7 @@ public interface FilterStreamContext<F extends FilterOutputStream> } } - + public static class SimpleFilterStreamContext<F extends FilterOutputStream> extends BaseFilterStreamContext<F> { public SimpleFilterStreamContext(F filterStream) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java index 75e6e5f..6debaec 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FilterStreamProvider.java @@ -39,9 +39,9 @@ import com.google.common.collect.Maps; public interface FilterStreamProvider<F extends FilterOutputStream, S extends OutputStream> { public FilterStreamContext<F> getFilterStreamContext(S outputStream) throws IOException; - + public void reclaimFilterStreamContext(FilterStreamContext<F> filterStreamContext); - + abstract class SimpleFilterReusableStreamProvider<F extends FilterOutputStream, S extends OutputStream> implements FilterStreamProvider<F, S> { @@ -67,7 +67,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou reusableContexts.put(outputStream, filterStreamContext); } } - + protected abstract FilterStreamContext<F> createFilterStreamContext(OutputStream outputStream) throws IOException; } @@ -78,7 +78,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou public static class FilterChainStreamProvider<F extends FilterOutputStream, S extends OutputStream> implements FilterStreamProvider<F, S> { private List<FilterStreamProvider<?,?>> streamProviders = new ArrayList<FilterStreamProvider<?, ?>>(); - + public Collection<FilterStreamProvider<?,?>> getStreamProviders() { return Collections.unmodifiableList(streamProviders); @@ -88,7 +88,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou { streamProviders.add(streamProvider); } - + @Override public FilterStreamContext<F> getFilterStreamContext(S outputStream) throws IOException { @@ -120,7 +120,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou private class FilterChainStreamContext extends FilterStreamContext.BaseFilterStreamContext implements FilterStreamContext { - + private List<FilterStreamContext<?>> streamContexts = new ArrayList<FilterStreamContext<?>>(); public void pushStreamContext(FilterStreamContext<?> streamContext) @@ -128,7 +128,7 @@ public interface FilterStreamProvider<F extends FilterOutputStream, S extends Ou streamContexts.add(0, streamContext); filterStream = streamContext.getFilterStream(); } - + public Collection<FilterStreamContext<?>> getStreamContexts() { return Collections.unmodifiableCollection(streamContexts); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java index f4d1a38..5fbc580 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileCopyModule.java @@ -37,9 +37,9 @@ import com.datatorrent.netlet.util.Slice; * copy files from any file system to HDFS. This module supports parallel write * to multiple blocks of the same file and then stitching those blocks in * original sequence. - * + * * Essential operators are wrapped into single component using Module API. - * + * * * @since 3.4.0 */ @@ -108,7 +108,7 @@ public class HDFSFileCopyModule implements Module /** * Path of the output directory. Relative path of the files copied will be * maintained w.r.t. source directory and output directory - * + * * @return output directory path */ public String getOutputDirectoryPath() @@ -119,7 +119,7 @@ public class HDFSFileCopyModule implements Module /** * Path of the output directory. Relative path of the files copied will be * maintained w.r.t. source directory and output directory - * + * * @param outputDirectoryPath * output directory path */ @@ -130,7 +130,7 @@ public class HDFSFileCopyModule implements Module /** * Flag to control if existing file with same name should be overwritten - * + * * @return Flag to control if existing file with same name should be * overwritten */ @@ -141,7 +141,7 @@ public class HDFSFileCopyModule implements Module /** * Flag to control if existing file with same name should be overwritten - * + * * @param overwriteOnConflict * Flag to control if existing file with same name should be * overwritten http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java index 6f72484..cd2bee3 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/HDFSFileMerger.java @@ -102,7 +102,7 @@ public class HDFSFileMerger extends FileMerger /** * Fast merge using HDFS block concat - * + * * @param outputFileMetadata * @throws IOException */ @@ -130,7 +130,7 @@ public class HDFSFileMerger extends FileMerger /** * Attempt for recovery if block concat is successful but temp file is not * moved to final file - * + * * @param outputFileMetadata * @throws IOException */ @@ -179,7 +179,7 @@ public class HDFSFileMerger extends FileMerger /** * Checks if fast merge is possible for given settings for blocks directory, * application file system, block size - * + * * @param outputFileMetadata * @throws IOException * @throws BlockNotFoundException http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java b/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java index 8632343..a325a2f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/Synchronizer.java @@ -115,7 +115,7 @@ public class Synchronizer extends BaseOperator /** * Checks if all blocks for given file are received. Sends triggger when all * blocks are received. - * + * * @param fileMetadata * @param receivedBlocksMetadata */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java index efda6b0..f26ecf4 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSSinglePortOutputOperator.java @@ -55,7 +55,7 @@ public abstract class AbstractJMSSinglePortOutputOperator<T> extends AbstractJMS { @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(AbstractJMSSinglePortOutputOperator.class); - + /** * Convert to and send message. * @param tuple http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java index 772464a..99c2eeb 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/JMSBase.java @@ -139,7 +139,7 @@ public class JMSBase { this.connectionFactoryProperties = connectionFactoryProperties; } - + /** * Get the fully qualified class-name of the connection factory that is used by this * builder to instantiate the connection factory @@ -150,7 +150,7 @@ public class JMSBase { return connectionFactoryClass; } - + /** * Set the fully qualified class-name of the connection factory that is used by this * builder to instantiate the connection factory @@ -213,7 +213,7 @@ public class JMSBase { return destination; } - + /** * gets the connection factory class-name used by the default connection factory builder * @@ -244,7 +244,7 @@ public class JMSBase } return (DefaultConnectionFactoryBuilder)connectionFactoryBuilder; } - + /** * Sets the connection factory class-name used by the default connection factory builder * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java index 5f09a4b..e403979 100644 --- a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java +++ b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java @@ -42,7 +42,7 @@ import com.datatorrent.api.DefaultInputPort; * <b>integerResult</b>: emits Integer<br> * <b>longResult</b>: emits Long<br> * <br> - * + * * @displayName Abstract Aggregate Calculator * @category Math * @tags aggregate, collection http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java index 91cc9ba..f7283e0 100644 --- a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java +++ b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java @@ -22,9 +22,9 @@ import org.w3c.dom.Node; import org.w3c.dom.NodeList; /** - * This operator extends the AbstractXmlCartesianProduct operator and implements the node value + * This operator extends the AbstractXmlCartesianProduct operator and implements the node value * as a key value pair of node name and the node's text value. - * + * * @displayName Abstract XML Key Value Cartesian Product * @category Math * @tags cartesian product, xml, multiple products, key value http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Division.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Division.java b/library/src/main/java/com/datatorrent/lib/math/Division.java index d05af18..f5a01aa 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Division.java +++ b/library/src/main/java/com/datatorrent/lib/math/Division.java @@ -58,17 +58,17 @@ public class Division extends BaseOperator * Array to store numerator inputs during window. */ private ArrayList<Number> numer = new ArrayList<Number>(); - + /** * Array to store denominator input during window. */ private ArrayList<Number> denom = new ArrayList<Number>(); - + /** * Number of pair processed in current window. */ private int index = 0; - + /** * Numerator input port. */ @@ -112,55 +112,55 @@ public class Division extends BaseOperator } } }; - + /** - * Long quotient output port. + * Long quotient output port. */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<Long> longQuotient = new DefaultOutputPort<Long>(); - + /** - * Integer quotient output port. + * Integer quotient output port. */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<Integer> integerQuotient = new DefaultOutputPort<Integer>(); - + /** - * Double quotient output port. + * Double quotient output port. */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<Double> doubleQuotient = new DefaultOutputPort<Double>(); /** - * Float quotient output port. + * Float quotient output port. */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<Float> floatQuotient = new DefaultOutputPort<Float>(); - + /** - * Long remainder output port. + * Long remainder output port. */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<Long> longRemainder = new DefaultOutputPort<Long>(); - + /** - * Integer remainder output port. + * Integer remainder output port. */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<Integer> integerRemainder = new DefaultOutputPort<Integer>(); - + /** - * Double remainder output port. + * Double remainder output port. */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<Double> doubleRemainder = new DefaultOutputPort<Double>(); - + /** - * Float remainder output port. + * Float remainder output port. */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<Float> floatRemainder = new DefaultOutputPort<Float>(); - + /** * Error data output port that emits a string. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Margin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Margin.java b/library/src/main/java/com/datatorrent/lib/math/Margin.java index 94e15d6..5d1872e 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Margin.java +++ b/library/src/main/java/com/datatorrent/lib/math/Margin.java @@ -24,7 +24,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.lib.util.BaseNumberValueOperator; /** - * This operator sums the division of numerator and denominator value arriving at input ports. + * This operator sums the division of numerator and denominator value arriving at input ports. * <p> * <br> * Margin Formula used by this operator: 1 - numerator/denominator.<br> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/MarginMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java index 7ef1f81..d1fa33f 100644 --- a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java +++ b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java @@ -54,7 +54,7 @@ public class MarginMap<K, V extends Number> extends BaseNumberKeyValueOperator<K { /** * Numerator input port that takes a map. - */ + */ public final transient DefaultInputPort<Map<K, V>> numerator = new DefaultInputPort<Map<K, V>>() { /** @@ -66,7 +66,7 @@ public class MarginMap<K, V extends Number> extends BaseNumberKeyValueOperator<K addTuple(tuple, numerators); } }; - + /** * Denominator input port that takes a map. */ @@ -101,7 +101,7 @@ public class MarginMap<K, V extends Number> extends BaseNumberKeyValueOperator<K val.add(e.getValue().doubleValue()); } } - + /* * Output margin port that emits hashmap. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Min.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Min.java b/library/src/main/java/com/datatorrent/lib/math/Min.java index 4b3fa23..a862f2e 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Min.java +++ b/library/src/main/java/com/datatorrent/lib/math/Min.java @@ -45,10 +45,10 @@ public class Min<V extends Number> extends BaseNumberValueOperator<V> implements * Computed low value. */ protected V low; - + // transient field protected boolean flag = false; - + /** * Input port that takes a number and compares to min and stores the new min. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java index dd56f7f..5396768 100644 --- a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java +++ b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java @@ -26,7 +26,7 @@ import com.datatorrent.common.util.BaseOperator; /** * Multiplies input tuple (Number) by the value of property "multiplier" and emits the result on respective ports. * <p> - * This operator emits the result as Long on port "longProduct", as Integer on port "integerProduct", as Double on port "doubleProduct", and as Float on port "floatProduct". + * This operator emits the result as Long on port "longProduct", as Integer on port "integerProduct", as Double on port "doubleProduct", and as Float on port "floatProduct". * Output is computed in current window.No state dependency among input tuples * This is a pass through operator * <br> @@ -79,22 +79,22 @@ public class MultiplyByConstant extends BaseOperator } }; - + /** * Long output port. */ public final transient DefaultOutputPort<Long> longProduct = new DefaultOutputPort<Long>(); - + /** * Integer output port. */ public final transient DefaultOutputPort<Integer> integerProduct = new DefaultOutputPort<Integer>(); - + /** * Double output port. */ public final transient DefaultOutputPort<Double> doubleProduct = new DefaultOutputPort<Double>(); - + /** * Float output port. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java index 286d72e..163f06b 100644 --- a/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java +++ b/library/src/main/java/com/datatorrent/lib/math/RunningAverage.java @@ -24,7 +24,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.common.util.BaseOperator; /** - * Calculate the running average of the input numbers and emit it at the end of the window. + * Calculate the running average of the input numbers and emit it at the end of the window. * <p> * This is an end of window operator.<br> * <br> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Sigma.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Sigma.java b/library/src/main/java/com/datatorrent/lib/math/Sigma.java index 6bfb9cf..1a9df60 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Sigma.java +++ b/library/src/main/java/com/datatorrent/lib/math/Sigma.java @@ -26,7 +26,7 @@ import com.datatorrent.api.annotation.OperatorAnnotation; * Adds incoming tuple to the state and emits the result of each addition on the respective ports. * <p> * The addition would go on forever.Result is emitted on four different data type ports:floatResult,integerResult,longResult,doubleResult. - * Input tuple object has to be an implementation of the interface Collection.Tuples are emitted on the output ports only if they are connected. + * Input tuple object has to be an implementation of the interface Collection.Tuples are emitted on the output ports only if they are connected. * This is done to avoid the cost of calling the functions when some ports are not connected. * This is a stateful pass through operator<br> * <b>Partitions : </b>, no will yield wrong results, no unifier on output port. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java b/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java index d97b905..9569074 100644 --- a/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java +++ b/library/src/main/java/com/datatorrent/lib/math/SingleVariableAbstractCalculus.java @@ -23,7 +23,7 @@ import com.datatorrent.api.DefaultInputPort; /** * Transforms the input into the output after applying appropriate mathematical function to it and emits result on respective ports. * <p> - * Emits the result as Long on port "longResult", as Integer on port "integerResult",as Double on port "doubleResult", and as Float on port "floatResult". + * Emits the result as Long on port "longResult", as Integer on port "integerResult",as Double on port "doubleResult", and as Float on port "floatResult". * This is a pass through operator<br> * <br> * <b>Ports</b>:<br> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/Sum.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Sum.java b/library/src/main/java/com/datatorrent/lib/math/Sum.java index 0f5e64f..0214268 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Sum.java +++ b/library/src/main/java/com/datatorrent/lib/math/Sum.java @@ -29,7 +29,7 @@ import com.datatorrent.lib.util.BaseNumberValueOperator; import com.datatorrent.lib.util.UnifierSumNumber; /** - * This operator implements Unifier interface and emits the sum of values at the end of window. + * This operator implements Unifier interface and emits the sum of values at the end of window. * <p> * This is an end of window operator. Application can turn this into accumulated * sum operator by setting cumulative flag to true. <br> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java index 7f36ef5..cc50fe1 100644 --- a/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java +++ b/library/src/main/java/com/datatorrent/lib/math/XmlKeyValueStringCartesianProduct.java @@ -28,7 +28,7 @@ import com.datatorrent.api.DefaultOutputPort; /** * An implementation of the AbstractXmlKeyValueCartesianProduct operator that takes in the xml document * as a String input and outputs the cartesian product as Strings. - * + * * @displayName Xml Key Value String Cartesian Product * @category Math * @tags cartesian product, string, xml @@ -38,7 +38,7 @@ public class XmlKeyValueStringCartesianProduct extends AbstractXmlKeyValueCartes { InputSource source = new InputSource(); - + /** * Output port that emits cartesian product as Strings. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/math/package-info.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/package-info.java b/library/src/main/java/com/datatorrent/lib/math/package-info.java index f583662..c22309b 100644 --- a/library/src/main/java/com/datatorrent/lib/math/package-info.java +++ b/library/src/main/java/com/datatorrent/lib/math/package-info.java @@ -22,10 +22,10 @@ * Most of the arithmetic operators come in three types based on their schema. * The operators whose names ends with "Map" (eg SumMap, MaxMap, MinMap) take in Map on input ports and emit HashMap. These operators use * round robin partitioning and would merge as per their functionality. - * <br> + * <br> * The operators whose names ends with "KeyVal" (eg SumKeyVal, MaxKeyVal, MinKeyVal) take in KeyValPair and emit KeyValPair. These operators use * sticky key partitioning and would merge using default pass through merge operator. - * <br> + * <br> * The operators whose names are just their function name (eg Sum, Min, Max) operate on same objects and emit a final result. These operators have no keys. * They partition in roundrobin and would merge as per their functionality. * <br> http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/parser/Parser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/Parser.java b/library/src/main/java/com/datatorrent/lib/parser/Parser.java index 4f591f1..0403dc9 100644 --- a/library/src/main/java/com/datatorrent/lib/parser/Parser.java +++ b/library/src/main/java/com/datatorrent/lib/parser/Parser.java @@ -40,7 +40,7 @@ import com.datatorrent.lib.converter.Converter; * <b>err</b>: emits <INPUT> error port that emits input tuple that could * not be converted<br> * <br> - * + * * @displayName Parser * @tags parser converter * @param <INPUT> @@ -108,7 +108,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co /** * Get the class that needs to be formatted - * + * * @return Class<?> */ public Class<?> getClazz() @@ -118,7 +118,7 @@ public abstract class Parser<INPUT, ERROROUT> extends BaseOperator implements Co /** * Set the class of tuple that needs to be formatted - * + * * @param clazz */ public void setClazz(Class<?> clazz)
