http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java index c8eeacc..bdf6fad 100644 --- a/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java +++ b/library/src/main/java/com/datatorrent/lib/parser/XmlParser.java @@ -54,7 +54,7 @@ import com.datatorrent.netlet.util.DTThrowable; * the Pojo Class. <br> * <b>dateFormats</b>: Comma separated string of date formats e.g * dd/mm/yyyy,dd-mmm-yyyy where first one would be considered default - * + * * @displayName XmlParser * @category Parsers * @tags xml pojo parser
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java index 8c22140..6c17529 100644 --- a/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java +++ b/library/src/main/java/com/datatorrent/lib/projection/ProjectionOperator.java @@ -59,7 +59,7 @@ import com.datatorrent.lib.util.PojoUtils; * - projected port emits POJOs with projected fields from input POJOs * - remainder port, if connected, emits POJOs with remainder fields from input POJOs * - error port emits input POJOs as is upon error situations - * + * * <b>Examples</b> * For {a, b, c} type of input tuples * - when selectFields = "" and dropFields = "", projected port shall emit {a, b, c} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java index 9532180..d6589ee 100644 --- a/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java +++ b/library/src/main/java/com/datatorrent/lib/script/ScriptOperator.java @@ -31,8 +31,8 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.common.util.BaseOperator; /** - * A base implementation of a BaseOperator for language script operator. Subclasses should provide the - implementation of getting the bindings and process method. + * A base implementation of a BaseOperator for language script operator. Subclasses should provide the + implementation of getting the bindings and process method. * Interface for language script operator. * <p> * @displayName Script @@ -55,13 +55,13 @@ public abstract class ScriptOperator extends BaseOperator } }; - + /** * Output outBindings port that emits a map of <String, Object>. */ @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<Map<String, Object>> outBindings = new DefaultOutputPort<Map<String, Object>>(); - + /** * Output result port that emits an object as the result. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java b/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java index d206071..a7f5147 100644 --- a/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java +++ b/library/src/main/java/com/datatorrent/lib/util/AbstractKeyValueStorageAgent.java @@ -32,10 +32,10 @@ import com.datatorrent.api.StorageAgent; /** * Abstract implementation of {@link ApplicationAwareStorageAgent} which can be * configured be KeyValue store witch implementation of {@link StorageAgentKeyValueStore} - * + * * NOTE - this should be picked from APEX-CORE once below feature is release * https://issues.apache.org/jira/browse/APEXCORE-283 - * + * * @param <S> * Store implementation * @@ -71,7 +71,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu /** * Return yarn application id of running application - * + * * @return */ public String getApplicationId() @@ -81,7 +81,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu /** * Set yarn application id - * + * * @param applicationId */ public void setApplicationId(String applicationId) @@ -92,7 +92,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu /** * Generates key from operator id and window id to store unique operator * checkpoints - * + * * @param operatorId * @param windowId * @return unique key for store @@ -104,14 +104,14 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu /** * Stores the given operator object in configured store - * + * * @param object * Operator object to store * @param operatorId * of operator * @param windowId * window id of operator to checkpoint - * + * */ @Override public void save(Object object, int operatorId, long windowId) throws IOException @@ -136,7 +136,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu /** * Retrieves the operator object for given operator & window from configured * store - * + * * @param operatorId * of operator * @param windowId @@ -167,7 +167,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu /** * Removes stored operator object for given operatorId & windowId from store - * + * */ @Override public void delete(int operatorId, long windowId) throws IOException @@ -189,7 +189,7 @@ public abstract class AbstractKeyValueStorageAgent<S extends StorageAgentKeyValu /** * Returns list window id for given operator id for which operator objects are * stored but not removed - * + * */ @Override public long[] getWindowIds(int operatorId) throws IOException http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java b/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java index 6deed74..5477f4a 100644 --- a/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java +++ b/library/src/main/java/com/datatorrent/lib/util/StorageAgentKeyValueStore.java @@ -24,7 +24,7 @@ import com.datatorrent.lib.db.KeyValueStore; /** * Interface for KeyValue store - * + * * * @since 3.4.0 */ @@ -33,7 +33,7 @@ public interface StorageAgentKeyValueStore extends KeyValueStore /** * Get all the keys associated with key - * + * * @param key * @return the list of all associated keys */ @@ -41,10 +41,10 @@ public interface StorageAgentKeyValueStore extends KeyValueStore /** * Set table/region name of store - * + * * @param tableName */ public void setTableName(String tableName); - - + + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/TableInfo.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/TableInfo.java b/library/src/main/java/com/datatorrent/lib/util/TableInfo.java index b0d454d..52bf117 100644 --- a/library/src/main/java/com/datatorrent/lib/util/TableInfo.java +++ b/library/src/main/java/com/datatorrent/lib/util/TableInfo.java @@ -66,6 +66,6 @@ public class TableInfo<T extends FieldInfo> { this.fieldsInfo = fieldsInfo; } - - + + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/TopNSort.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/TopNSort.java b/library/src/main/java/com/datatorrent/lib/util/TopNSort.java index ba9cb01..042c75b 100644 --- a/library/src/main/java/com/datatorrent/lib/util/TopNSort.java +++ b/library/src/main/java/com/datatorrent/lib/util/TopNSort.java @@ -136,7 +136,7 @@ public class TopNSort<E> if (list.isEmpty()) { return list; } - + Collections.reverse(list); return list; //return ret; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/com/datatorrent/lib/util/package-info.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/package-info.java b/library/src/main/java/com/datatorrent/lib/util/package-info.java index f65e415..7b1140c 100644 --- a/library/src/main/java/com/datatorrent/lib/util/package-info.java +++ b/library/src/main/java/com/datatorrent/lib/util/package-info.java @@ -17,7 +17,7 @@ * under the License. */ /** - * Library of shared operators and utilities. + * Library of shared operators and utilities. */ @org.apache.hadoop.classification.InterfaceStability.Evolving package com.datatorrent.lib.util; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java index 5509ba0..7763103 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dedup/BoundedDedupOperator.java @@ -42,17 +42,17 @@ import com.datatorrent.netlet.util.Slice; * An implementation for {@link AbstractDeduper} which handles the case of bounded data set. * This implementation assumes that the incoming tuple does not have a time field, and the de-duplication * is to be strictly based on the key of the tuple. - * + * * This implementation uses {@link ManagedTimeStateImpl} for storing the tuple keys on the persistent storage. - * + * * Following properties need to be configured for the functioning of the operator: * 1. {@link #keyExpression}: The java expression to extract the key fields in the incoming tuple (POJO) - * 2. {@link #numBuckets} (optional): The number of buckets that need to be used for storing the keys of the + * 2. {@link #numBuckets} (optional): The number of buckets that need to be used for storing the keys of the * incoming tuples. - * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys + * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys * in the application. A appropriate value would be sqrt(num distinct keys). In case, the number of distinct keys is a * huge number, leave it blank so that the default value of 46340 will be used. The rationale for using this number is - * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of + * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of * each bucket, thus spreading the load equally among each bucket. * * @@ -194,10 +194,10 @@ public class BoundedDedupOperator extends AbstractDeduper<Object> /** * Sets the number of buckets - * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys + * NOTE: Users can decide upon the proper value for this parameter by guessing the number of distinct keys * in the application. A appropriate value would be sqrt(num distinct keys). In case, the number of distinct keys is a * huge number, leave it blank so that the default value of 46340 will be used. The rationale for using this number is - * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of + * that sqrt(max integer) = 46340. This implies that the number of buckets used will roughly be equal to the size of * each bucket, thus spreading the load equally among each bucket. * @param numBuckets the number of buckets */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java index f7ab25d..52ef811 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregator.java @@ -44,9 +44,9 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator //protected int embedAggregatorID; protected Set<Integer> embedAggregatorDdIds = Sets.newHashSet(); protected Set<String> fields = Sets.newHashSet(); - + protected DimensionsConversionContext dimensionsConversionContext; - + public DimensionsConversionContext getDimensionsConversionContext() { return dimensionsConversionContext; @@ -63,7 +63,7 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator this.setDimensionsConversionContext(dimensionsConversionContext); return this; } - + public String getEmbedAggregatorName() { return embedAggregatorName; @@ -96,7 +96,7 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator { this.dimensionDescriptorID = dimensionDescriptorID; } - + @Override public int getAggregatorID() { @@ -118,7 +118,7 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator { this.aggregateDescriptor = aggregateDescriptor; } - + @Override public Set<String> getFields() { @@ -155,7 +155,7 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator { embedAggregatorDdIds.addAll(ddids); } - + /** * bright: TODO: check */ @@ -164,5 +164,5 @@ public abstract class AbstractCompositeAggregator implements CompositeAggregator { return null; } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java index 8156064..5cf4582 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractCompositeAggregatorFactory.java @@ -28,7 +28,7 @@ public abstract class AbstractCompositeAggregatorFactory implements CompositeAgg protected static final String NAME_TEMPLATE = "%s-%s-%s"; protected static final String PROPERTY_SEPERATOR = "_"; protected static final String PROPERTY_VALUE_SEPERATOR = "|"; - + @Override public String getCompositeAggregatorName(String aggregatorType, String embededAggregatorName, Map<String, Object> properties) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java index bf2e342..bf2054e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractIncrementalAggregator.java @@ -48,7 +48,7 @@ import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields, * one for cost and one for revenue. * </p> - * + * * * @since 3.4.0 */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java index 41d1372..e38ea0e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AbstractTopBottomAggregator.java @@ -43,7 +43,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre public static final String PROP_COUNT = "count"; protected int count; protected SortedSet<String> subCombinations = Sets.newTreeSet(); - + public AbstractTopBottomAggregator withEmbedAggregatorName(String embedAggregatorName) { this.setEmbedAggregatorName(embedAggregatorName); @@ -55,7 +55,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre this.setSubCombinations(subCombinations); return this; } - + public AbstractTopBottomAggregator withCount(int count) { this.setCount(count); @@ -71,7 +71,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre { this.count = count; } - + public void setSubCombinations(Set<String> subCombinations) { this.subCombinations.clear(); @@ -91,11 +91,12 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre /** * TOP/BOTTOM return a list of value */ + @Override public Type getOutputType() { return Type.OBJECT; } - + @Override public int hashCode() { @@ -115,7 +116,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre if (getClass() != obj.getClass()) { return false; } - + AbstractTopBottomAggregator other = (AbstractTopBottomAggregator)obj; if (embedAggregatorName != other.embedAggregatorName && (embedAggregatorName == null || !embedAggregatorName.equals(other.embedAggregatorName))) { @@ -131,8 +132,8 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre return true; } - - + + /** * The result keep a list of object for each aggregate value * The value of resultAggregate should keep a list of inputEventKey(the value can be get from cache or load) or a map @@ -149,7 +150,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre //there are problem for composite's value field descriptor, just ignore now. GPOMutable resultGpo = resultAggregate.getAggregates(); final List<String> compositeFieldList = resultAggregate.getEventKey().getKey().getFieldDescriptor().getFieldList(); - + //Map<EventKey, Aggregate> existedSubEventKeyToAggregate = Maps.newHashMap(); for (String valueField : resultGpo.getFieldDescriptor().getFieldList()) { //the resultGpo keep a list of sub aggregates @@ -168,7 +169,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre /** * get store map key from the eventKey - * + * * @param eventKey * @return */ @@ -183,16 +184,16 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre key.append(subEventKey.getKey().getField(field)).append(KEY_VALUE_SEPERATOR); } key.deleteCharAt(key.length() - 1); - + return key.toString(); } - + /** - * update existed sub aggregate. + * update existed sub aggregate. * The sub aggregates which kept in composite aggregate as candidate could be changed. synchronize the value with * input aggregates. - * + * * @param resultAggregate * @param valueField * @param inputSubEventKeys @@ -218,7 +219,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre } } } - + /** * need a map of value field from the inputGpo to resultGpo, use the index of Fields as the index * @param resultGpo @@ -241,7 +242,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre fieldToType.get(aggregateField)); } } - + /** * seperate it in case sub class override it. * @param fieldName @@ -252,7 +253,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre { return Maps.newHashMap(); } - + /** * compare the result(resultMap) with input(inputFieldName, inputFieldValue) * @param resultMap @@ -275,7 +276,7 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre } } - + /** * shoud the result element replaced by input element. * the inputElement and resultElement should be same type @@ -299,11 +300,11 @@ public abstract class AbstractTopBottomAggregator extends AbstractCompositeAggre int compareResult = ((Comparable<Object>)resultElement).compareTo(inputElement); return shouldReplaceResultElement(compareResult); } - + //handle other cases throw new RuntimeException("Should NOT come here."); - + } - + protected abstract boolean shouldReplaceResultElement(int resultCompareToInput); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java index 6482c3b..85f1822 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java @@ -108,9 +108,9 @@ public class AggregatorRegistry implements Serializable * {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}. */ private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator; - + protected transient Map<Integer, AbstractTopBottomAggregator> topBottomAggregatorIDToAggregator; - + /** * This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link IncrementalAggregator}. */ @@ -119,19 +119,19 @@ public class AggregatorRegistry implements Serializable * This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}. */ private Map<String, OTFAggregator> nameToOTFAggregator; - + /** * the map from TOPN and BOTTOM aggregator to name */ private Map<String, AbstractTopBottomAggregator> nameToTopBottomAggregator = Maps.newHashMap(); - + /** * This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link IncrementalAggregator}. */ private Map<String, Integer> incrementalAggregatorNameToID; - + protected Map<String, Integer> topBottomAggregatorNameToID = Maps.newHashMap(); - + protected static Set<String> topBottomAggregatorNames; @@ -269,12 +269,12 @@ public class AggregatorRegistry implements Serializable Preconditions.checkNotNull(entry.getKey()); Preconditions.checkNotNull(entry.getValue()); } - + for (Map.Entry<String, Integer> entry : topBottomAggregatorNameToID.entrySet()) { Preconditions.checkNotNull(entry.getKey()); Preconditions.checkNotNull(entry.getValue()); } - + for (Map.Entry<String, AbstractTopBottomAggregator> entry : nameToTopBottomAggregator.entrySet()) { Preconditions.checkNotNull(entry.getKey()); Preconditions.checkNotNull(entry.getValue()); @@ -337,7 +337,7 @@ public class AggregatorRegistry implements Serializable nameToTopBottomAggregator.get(aggregatorName)); } } - + /** * This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s name * to an {@link IncrementalAggregator}. @@ -375,7 +375,7 @@ public class AggregatorRegistry implements Serializable nameToOTFAggregator.containsKey(aggregatorName)) { return true; } - + //the composite probably send whole aggregator name String aggregatorType = aggregatorName.split("-")[0]; return (AggregatorTopBottomType.valueOf(aggregatorType) != null); @@ -399,12 +399,12 @@ public class AggregatorRegistry implements Serializable { return nameToOTFAggregator.containsKey(aggregatorName); } - + public boolean isTopBottomAggregatorType(String aggregatorType) { return (AggregatorTopBottomType.valueOf(aggregatorType) != null); } - + /** * Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}. * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java index d9ad83d..006cadf 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java @@ -164,5 +164,5 @@ public final class AggregatorUtils return new FieldsDescriptor(fieldToType, fieldToSerde); } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java index 916467d..e64e957 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregator.java @@ -37,13 +37,13 @@ public interface CompositeAggregator public int getDimensionDescriptorID(); public int getAggregatorID(); - + public Set<Integer> getEmbedAggregatorDdIds(); - + public Set<String> getFields(); public FieldsDescriptor getAggregateDescriptor(); - + public FieldsDescriptor getMetaDataDescriptor(); /** @@ -52,9 +52,9 @@ public interface CompositeAggregator * @return The output type of the {@link CompositeAggregator}. */ public Type getOutputType(); - + /** - * + * * @param resultAggregate the aggregate to put the result * @param inputEventKeys The input(incremental) event keys, used to locate the input aggregates * @param inputAggregatesRepo: the map of the EventKey to Aggregate keep the super set of aggregate required http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java index da1d225..18682d0 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/CompositeAggregatorFactory.java @@ -35,7 +35,7 @@ public interface CompositeAggregatorFactory * @return */ //public boolean isValidCompositeAggregatorName(String aggregatorName); - + /** * get composite aggregator name based on composite aggregator information * @param aggregatorType @@ -45,7 +45,7 @@ public interface CompositeAggregatorFactory */ public String getCompositeAggregatorName(String aggregatorType, String embedAggregatorName, Map<String, Object> properties); - + /** * create composite aggregator name based on composite aggregator information * @param aggregatorType http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java index 125c3f1..a3a148a 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/DefaultCompositeAggregatorFactory.java @@ -25,7 +25,7 @@ import com.google.common.collect.Maps; /** * The DefaultCompositeAggregatorFactory find the specific factory according to the aggregator type * and delegate to the specific factory. - * + * * * @since 3.4.0 */ @@ -34,9 +34,9 @@ public class DefaultCompositeAggregatorFactory implements CompositeAggregatorFac public static final DefaultCompositeAggregatorFactory defaultInst = new DefaultCompositeAggregatorFactory() .addFactory(AggregatorTopBottomType.TOPN.name(), TopBottomAggregatorFactory.defaultInstance) .addFactory(AggregatorTopBottomType.BOTTOMN.name(), TopBottomAggregatorFactory.defaultInstance); - + protected Map<String, CompositeAggregatorFactory> factoryRepository = Maps.newHashMap(); - + @Override public String getCompositeAggregatorName(String aggregatorType, String embedAggregatorName, Map<String, Object> properties) @@ -57,7 +57,7 @@ public class DefaultCompositeAggregatorFactory implements CompositeAggregatorFac { return factoryRepository.get(aggregatorType); } - + public DefaultCompositeAggregatorFactory addFactory(String aggregatorType, CompositeAggregatorFactory factory) { factoryRepository.put(aggregatorType, factory); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java index 89f6bb7..8843b4e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/TopBottomAggregatorFactory.java @@ -33,7 +33,7 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto public static final String PROPERTY_NAME_SUB_COMBINATIONS = "subCombinations"; public static final TopBottomAggregatorFactory defaultInstance = new TopBottomAggregatorFactory(); - + @Override public <T> AbstractTopBottomAggregator createCompositeAggregator(String aggregatorType, String embedAggregatorName, Map<String, Object> properties) @@ -41,7 +41,7 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto return createTopBottomAggregator(aggregatorType, embedAggregatorName, getCount(properties), getSubCombinations(properties)); } - + public <T> AbstractTopBottomAggregator createTopBottomAggregator(String aggregatorType, String embedAggregatorName, int count, String[] subCombinations) { @@ -58,7 +58,7 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto aggregator.setEmbedAggregatorName(embedAggregatorName); aggregator.setCount(count); aggregator.setSubCombinations(subCombinations); - + return aggregator; } @@ -66,12 +66,12 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto { return Integer.valueOf((String)properties.get(PROPERTY_NAME_COUNT)); } - + protected String[] getSubCombinations(Map<String, Object> properties) { return (String[])properties.get(PROPERTY_NAME_SUB_COMBINATIONS); } - + /** * The properties of TOP or BOTTOM are count and subCombinations. * count only have one value and subCombinations is a set of string, we can order combinations to simplify the name @@ -82,13 +82,13 @@ public class TopBottomAggregatorFactory extends AbstractCompositeAggregatorFacto StringBuilder sb = new StringBuilder(); String count = (String)properties.get(PROPERTY_NAME_COUNT); sb.append(count).append(PROPERTY_SEPERATOR); - + String[] subCombinations = (String[])properties.get(PROPERTY_NAME_SUB_COMBINATIONS); Set<String> sortedSubCombinations = Sets.newTreeSet(); for (String subCombination : subCombinations) { sortedSubCombinations.add(subCombination); } - + for (String subCombination : sortedSubCombinations) { sb.append(subCombination).append(PROPERTY_SEPERATOR); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java index 31f35aa..268c51b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReader.java @@ -32,7 +32,7 @@ import com.datatorrent.lib.io.block.ReaderContext; * This operator can be used for reading records/tuples from Filesystem in * parallel (without ordering guarantees between tuples). Records can be * delimited (e.g. newline) or fixed width records. Output tuples are byte[]. - * + * * Typically, this operator will be connected to output of FileSplitterInput to * read records in parallel. * @@ -106,7 +106,7 @@ public class FSRecordReader extends FSSliceReader /** * Criteria for record split - * + * * @param mode * Mode */ @@ -117,7 +117,7 @@ public class FSRecordReader extends FSSliceReader /** * Criteria for record split - * + * * @return mode */ public RECORD_READER_MODE getMode() @@ -127,7 +127,7 @@ public class FSRecordReader extends FSSliceReader /** * Length for fixed width record - * + * * @param recordLength */ public void setRecordLength(int recordLength) @@ -140,7 +140,7 @@ public class FSRecordReader extends FSSliceReader /** * Length for fixed width record - * + * * @return record length */ public int getRecordLength() http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java index 0a9b321..d508320 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/fs/FSRecordReaderModule.java @@ -42,7 +42,7 @@ import com.datatorrent.lib.io.fs.FileSplitterInput; * (Ordering is not guaranteed when records are read in parallel) * * Input directory is scanned at specified interval to poll for new data. - * + * * The module reads data in parallel, following parameters can be configured * <br/> * 1. files: list of file(s)/directories to read<br/> @@ -91,7 +91,7 @@ public class FSRecordReaderModule implements Module /** * Creates an instance of FileSplitter - * + * * @return */ public FileSplitterInput createFileSplitter() @@ -101,7 +101,7 @@ public class FSRecordReaderModule implements Module /** * Creates an instance of Record Reader - * + * * @return FSRecordReader instance */ public FSRecordReader createRecordReader() @@ -233,7 +233,7 @@ public class FSRecordReaderModule implements Module /** * Gets readers count - * + * * @return readersCount */ public int getReadersCount() @@ -243,7 +243,7 @@ public class FSRecordReaderModule implements Module /** * Static count of readers to read input file - * + * * @param readersCount */ public void setReadersCount(int readersCount) @@ -276,7 +276,7 @@ public class FSRecordReaderModule implements Module * Sets number of blocks to be emitted per window.<br/> * A lot of blocks emitted per window can overwhelm the downstream operators. * Set this value considering blockSize and readersCount. - * + * * @param threshold */ public void setBlocksThreshold(int threshold) @@ -288,7 +288,7 @@ public class FSRecordReaderModule implements Module * Gets number of blocks to be emitted per window.<br/> * A lot of blocks emitted per window can overwhelm the downstream operators. * Set this value considering blockSize and readersCount. - * + * * @return */ public int getBlocksThreshold() @@ -298,7 +298,7 @@ public class FSRecordReaderModule implements Module /** * Criteria for record split - * + * * @return mode */ public RECORD_READER_MODE getMode() @@ -308,7 +308,7 @@ public class FSRecordReaderModule implements Module /** * Criteria for record split - * + * * @param mode * Mode */ @@ -319,7 +319,7 @@ public class FSRecordReaderModule implements Module /** * Length for fixed width record - * + * * @return record length */ public int getRecordLength() @@ -329,7 +329,7 @@ public class FSRecordReaderModule implements Module /** * Length for fixed width record - * + * * @param recordLength */ public void setRecordLength(int recordLength) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java index 237f4b9..3b01ed2 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java @@ -193,7 +193,7 @@ public class IncrementalCheckpointManager extends FSWindowDataManager Map<Long, Object> artifactPerWindow = new HashMap<>(); FileSystemWAL.FileSystemWALReader reader = getWal().getReader(); reader.seek(getWal().getWalStartPointer()); - + Slice windowSlice = readNext(reader); while (reader.getCurrentPointer().compareTo(getWal().getWalEndPointerAfterRecovery()) < 0 && windowSlice != null) { long window = Longs.fromByteArray(windowSlice.toByteArray()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java index b12f119..52dfac7 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java @@ -117,13 +117,13 @@ public class TimeBasedPriorityQueue<T> } else if (this.timestamp > timeWrapper.getTimestamp()) { return 1; } - + /** * NOTE: the following use the equals() to implement the compareTo() for key. - * it should be OK as the compareTo() only used by TimeBasedPriorityQueue.sortedTimestamp, + * it should be OK as the compareTo() only used by TimeBasedPriorityQueue.sortedTimestamp, * which only care about the order of time ( the order for key doesn't matter ). * But would cause problem if add other function which depended on the order of the key. - * + * * Add compare by hashCode when not equals in order to compatible with the interface for most cases. * Anyway, the order of key is not guaranteed. And we should not return 0 if not equals */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java index 2b85580..6e8774e 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java @@ -134,7 +134,7 @@ public class FSWindowDataManager implements WindowDataManager * Used by {@link IncrementalCheckpointManager} */ private boolean relyOnCheckpoints; - + private transient long largestCompletedWindow = Stateless.WINDOW_ID; private final FSWindowReplayWAL wal = new FSWindowReplayWAL(); @@ -303,7 +303,7 @@ public class FSWindowDataManager implements WindowDataManager long lastWindow = Stateless.WINDOW_ID; Slice slice = readNext(reader); - + while (slice != null) { boolean skipComplete = skipNext(reader); //skip the artifact because we need just the largest window id. if (!skipComplete) { @@ -311,7 +311,7 @@ public class FSWindowDataManager implements WindowDataManager break; } long offset = reader.getCurrentPointer().getOffset(); - + long window = Longs.fromByteArray(slice.toByteArray()); if (ceilingWindow != null && window > ceilingWindow) { break; @@ -393,7 +393,7 @@ public class FSWindowDataManager implements WindowDataManager } } } - + /** * Save writes 2 entries to the wal: <br/> * <ol> @@ -481,7 +481,7 @@ public class FSWindowDataManager implements WindowDataManager wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum()); wal.retrievedWindow = readNext(reader); //null or next window - + return fromSlice(data); } else if (windowId < currentWindow) { //no artifact saved corresponding to that window and artifact is not read. @@ -500,7 +500,7 @@ public class FSWindowDataManager implements WindowDataManager } return null; } - + /** * Deletes artifacts for all windows less than equal to committed window id.<p/> * http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java index 74ca929..d848804 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java @@ -73,12 +73,12 @@ public class FSWindowReplayWAL extends FileSystemWAL throw new RuntimeException("while setup"); } } - + public FileSystemWALPointer getWalEndPointerAfterRecovery() { return walEndPointerAfterRecovery; } - + /** * Finalizes files just after rotation. Doesn't wait for the window to be committed. */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java index b7d5ba1..49f61a4 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java @@ -163,7 +163,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil { return filePath + "_" + partNumber; } - + /** * @return the wal start pointer */ http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java index f0a66a4..af623f3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java @@ -33,7 +33,7 @@ public class Average implements Accumulation<Double, MutablePair<Double, Long>, { return new MutablePair<>(0.0, 0L); } - + @Override public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input) { @@ -41,7 +41,7 @@ public class Average implements Accumulation<Double, MutablePair<Double, Long>, accu.setRight(accu.getRight() + 1); return accu; } - + @Override public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2) { @@ -50,13 +50,13 @@ public class Average implements Accumulation<Double, MutablePair<Double, Long>, accu1.setRight(accu1.getRight() + accu2.getRight()); return accu1; } - + @Override public Double getOutput(MutablePair<Double, Long> accumulatedValue) { return accumulatedValue.getLeft(); } - + @Override public Double getRetraction(Double value) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java index f2affd1..d217ce9 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java @@ -35,27 +35,27 @@ public class Group<T> implements Accumulation<T, List<T>, List<T>> { return new ArrayList<>(); } - + @Override public List<T> accumulate(List<T> accumulatedValue, T input) { accumulatedValue.add(input); return accumulatedValue; } - + @Override public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2) { accumulatedValue1.addAll(accumulatedValue2); return accumulatedValue1; } - + @Override public List<T> getOutput(List<T> accumulatedValue) { return accumulatedValue; } - + @Override public List<T> getRetraction(List<T> value) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java index 64ff0c4..92aec18 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java @@ -28,20 +28,20 @@ import org.apache.apex.malhar.lib.window.Accumulation; */ public class Max<T> implements Accumulation<T, T, T> { - + Comparator<T> comparator; - + public void setComparator(Comparator<T> comparator) { this.comparator = comparator; } - + @Override public T defaultAccumulatedValue() { return null; } - + @Override public T accumulate(T accumulatedValue, T input) { @@ -55,19 +55,19 @@ public class Max<T> implements Accumulation<T, T, T> throw new RuntimeException("Tuple cannot be compared"); } } - + @Override public T merge(T accumulatedValue1, T accumulatedValue2) { return accumulate(accumulatedValue1, accumulatedValue2); } - + @Override public T getOutput(T accumulatedValue) { return accumulatedValue; } - + @Override public T getRetraction(T value) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java index 48017a7..2b6247a 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java @@ -28,20 +28,20 @@ import org.apache.apex.malhar.lib.window.Accumulation; */ public class Min<T> implements Accumulation<T, T, T> { - + Comparator<T> comparator; - + public void setComparator(Comparator<T> comparator) { this.comparator = comparator; } - + @Override public T defaultAccumulatedValue() { return null; } - + @Override public T accumulate(T accumulatedValue, T input) { @@ -55,19 +55,19 @@ public class Min<T> implements Accumulation<T, T, T> throw new RuntimeException("Tuple cannot be compared"); } } - + @Override public T merge(T accumulatedValue1, T accumulatedValue2) { return accumulate(accumulatedValue1, accumulatedValue2); } - + @Override public T getOutput(T accumulatedValue) { return accumulatedValue; } - + @Override public T getRetraction(T value) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java index 2548f72..53f3534 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java @@ -38,14 +38,14 @@ public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>> { return new HashSet<>(); } - + @Override public Set<T> accumulate(Set<T> accumulatedValue, T input) { accumulatedValue.add(input); return accumulatedValue; } - + @Override public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2) { @@ -54,7 +54,7 @@ public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>> } return accumulatedValue1; } - + @Override public List<T> getOutput(Set<T> accumulatedValue) { @@ -64,7 +64,7 @@ public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>> return new ArrayList<>(accumulatedValue); } } - + @Override public List<T> getRetraction(List<T> value) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java index 11ab2ab..475d653 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java @@ -33,27 +33,27 @@ public class SumDouble implements Accumulation<Double, MutableDouble, Double> { return new MutableDouble(0.0); } - + @Override public MutableDouble accumulate(MutableDouble accumulatedValue, Double input) { accumulatedValue.add(input); return accumulatedValue; } - + @Override public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2) { accumulatedValue1.add(accumulatedValue2); return accumulatedValue1; } - + @Override public Double getOutput(MutableDouble accumulatedValue) { return accumulatedValue.doubleValue(); } - + @Override public Double getRetraction(Double value) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java index d11bec3..dff3be6 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java @@ -33,27 +33,27 @@ public class SumFloat implements Accumulation<Float, MutableFloat, Float> { return new MutableFloat(0.); } - + @Override public MutableFloat accumulate(MutableFloat accumulatedValue, Float input) { accumulatedValue.add(input); return accumulatedValue; } - + @Override public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2) { accumulatedValue1.add(accumulatedValue2); return accumulatedValue1; } - + @Override public Float getOutput(MutableFloat accumulatedValue) { return accumulatedValue.floatValue(); } - + @Override public Float getRetraction(Float value) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java index cf0c50e..dca67a4 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java @@ -33,27 +33,27 @@ public class SumInt implements Accumulation<Integer, MutableInt, Integer> { return new MutableInt(0); } - + @Override public MutableInt accumulate(MutableInt accumulatedValue, Integer input) { accumulatedValue.add(input); return accumulatedValue; } - + @Override public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2) { accumulatedValue1.add(accumulatedValue2); return accumulatedValue1; } - + @Override public Integer getOutput(MutableInt accumulatedValue) { return accumulatedValue.intValue(); } - + @Override public Integer getRetraction(Integer value) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java index 55908f5..027e4f8 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java @@ -33,27 +33,27 @@ public class SumLong implements Accumulation<Long, MutableLong, Long> { return new MutableLong(0L); } - + @Override public MutableLong accumulate(MutableLong accumulatedValue, Long input) { accumulatedValue.add(input); return accumulatedValue; } - + @Override public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) { accumulatedValue1.add(accumulatedValue2); return accumulatedValue1; } - + @Override public Long getOutput(MutableLong accumulatedValue) { return accumulatedValue.longValue(); } - + @Override public Long getRetraction(Long value) { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java index 779b0f0..6db0557 100644 --- a/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java +++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/DTBCFile.java @@ -47,18 +47,18 @@ import org.apache.hadoop.io.file.tfile.Compression.Algorithm; /** - * + * * <ul> - * <li>The file format of DTFile is same as {@link TFile} with different reader implementation. + * <li>The file format of DTFile is same as {@link TFile} with different reader implementation. * It reads data block by block and cache the binary block data into memory to speed up the random read. - * - * <li>The public api of {@link Reader} is as same as it is in {@link TFile} {@link org.apache.hadoop.io.file.tfile.TFile.Reader} implementation. + * + * <li>The public api of {@link Reader} is as same as it is in {@link TFile} {@link org.apache.hadoop.io.file.tfile.TFile.Reader} implementation. * Besides, it provides getBlockBuffer(), getKeyOffset(), getKeyLength(), getValueOffset(), getValueLength() method * to expose raw block, key, value data to user to avoid unnecessary internal/external data copy - * + * * <li>In the performance test, It shows no difference in sequential reads and 20x faster in random reads(If most of them hit memory) * </ul> - * + * * Block Compressed file, the underlying physical storage layer for TFile. * BCFile provides the basic block level compression for the data block and meta * blocks. It is separated from TFile as it may be used for other @@ -102,7 +102,7 @@ final class DTBCFile { private static interface BlockRegister { /** * Register a block that is fully closed. - * + * * @param raw * The size of block in terms of uncompressed bytes. * @param offsetStart @@ -156,7 +156,7 @@ final class DTBCFile { /** * Get the output stream for BlockAppender's consumption. - * + * * @return the output stream suitable for writing block data. */ OutputStream getOutputStream() { @@ -165,7 +165,7 @@ final class DTBCFile { /** * Get the current position in file. - * + * * @return The current byte offset in underlying file. * @throws IOException */ @@ -179,7 +179,7 @@ final class DTBCFile { /** * Current size of compressed data. - * + * * @return * @throws IOException */ @@ -206,7 +206,7 @@ final class DTBCFile { /** * Access point to stuff data into a block. - * + * * TODO: Change DataOutputStream to something else that tracks the size as * long instead of int. Currently, we will wrap around if the row block size * is greater than 4GB. @@ -219,7 +219,7 @@ final class DTBCFile { /** * Constructor - * + * * @param register * the block register, which is called when the block is closed. * @param wbs @@ -233,7 +233,7 @@ final class DTBCFile { /** * Get the raw size of the block. - * + * * @return the number of uncompressed bytes written through the * BlockAppender so far. * @throws IOException @@ -248,7 +248,7 @@ final class DTBCFile { /** * Get the compressed size of the block in progress. - * + * * @return the number of compressed bytes written to the underlying FS * file. The size may be smaller than actual need to compress the * all data written due to internal buffering inside the @@ -289,7 +289,7 @@ final class DTBCFile { /** * Constructor - * + * * @param fout * FS output stream. * @param compressionName @@ -383,7 +383,7 @@ final class DTBCFile { * block. There can only be one BlockAppender stream active at any time. * Regular Blocks may not be created after the first Meta Blocks. The caller * must call BlockAppender.close() to conclude the block creation. - * + * * @param name * The name of the Meta Block. The name must not conflict with * existing Meta Blocks. @@ -407,7 +407,7 @@ final class DTBCFile { * active at any time. Regular Blocks may not be created after the first * Meta Blocks. The caller must call BlockAppender.close() to conclude the * block creation. - * + * * @param name * The name of the Meta Block. The name must not conflict with * existing Meta Blocks. @@ -426,7 +426,7 @@ final class DTBCFile { * block. There can only be one BlockAppender stream active at any time. * Data Blocks may not be created after the first Meta Blocks. The caller * must call BlockAppender.close() to conclude the block creation. - * + * * @return The BlockAppender stream * @throws IOException */ @@ -474,7 +474,7 @@ final class DTBCFile { /** * Callback to make sure a data block is added to the internal list when * it's being closed. - * + * */ private class DataBlockRegister implements BlockRegister { DataBlockRegister() { @@ -545,7 +545,7 @@ final class DTBCFile { /** * Get the output stream for BlockAppender's consumption. - * + * * @return the output stream suitable for writing block data. */ public ReusableByteArrayInputStream getInputStream() { @@ -579,7 +579,7 @@ final class DTBCFile { public static class BlockReader extends DataInputStream { private final RBlockState rBlkState; private boolean closed = false; - + private ReusableByteArrayInputStream wrappedInputStream = null; BlockReader(RBlockState rbs) { @@ -607,7 +607,7 @@ final class DTBCFile { /** * Get the name of the compression algorithm used to compress the block. - * + * * @return name of the compression algorithm. */ public String getCompressionName() { @@ -616,7 +616,7 @@ final class DTBCFile { /** * Get the uncompressed size of the block. - * + * * @return uncompressed size of the block. */ public long getRawSize() { @@ -625,7 +625,7 @@ final class DTBCFile { /** * Get the compressed size of the block. - * + * * @return compressed size of the block. */ public long getCompressedSize() { @@ -634,7 +634,7 @@ final class DTBCFile { /** * Get the starting position of the block in the file. - * + * * @return the starting position of the block in the file. */ public long getStartPos() { @@ -646,7 +646,7 @@ final class DTBCFile { closed = false; rBlkState.renew(); } - + public ReusableByteArrayInputStream getBlockDataInputStream() { return wrappedInputStream; @@ -655,7 +655,7 @@ final class DTBCFile { /** * Constructor - * + * * @param fin * FS input stream. * @param fileLength @@ -696,7 +696,7 @@ final class DTBCFile { /** * Get the name of the default compression algorithm. - * + * * @return the name of the default compression algorithm. */ public String getDefaultCompressionName() { @@ -705,7 +705,7 @@ final class DTBCFile { /** * Get version of BCFile file being read. - * + * * @return version of BCFile file being read. */ public Version getBCFileVersion() { @@ -714,7 +714,7 @@ final class DTBCFile { /** * Get version of BCFile API. - * + * * @return version of BCFile API. */ public Version getAPIVersion() { @@ -733,7 +733,7 @@ final class DTBCFile { /** * Get the number of data blocks. - * + * * @return the number of data blocks. */ public int getBlockCount() { @@ -742,7 +742,7 @@ final class DTBCFile { /** * Stream access to a Meta Block. - * + * * @param name * meta block name * @return BlockReader input stream for reading the meta block. @@ -763,7 +763,7 @@ final class DTBCFile { /** * Stream access to a Data Block. - * + * * @param blockIndex * 0-based data block index. * @return BlockReader input stream for reading the data block. @@ -797,7 +797,7 @@ final class DTBCFile { /** * Find the smallest Block index whose starting offset is greater than or * equal to the specified offset. - * + * * @param offset * User-specific offset. * @return the index to the data Block if such block exists; or -1 http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java index cb559dc..d9c483e 100644 --- a/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java +++ b/library/src/main/java/org/apache/hadoop/io/file/tfile/ReusableByteArrayInputStream.java @@ -28,9 +28,9 @@ import java.io.ByteArrayInputStream; */ public class ReusableByteArrayInputStream extends ByteArrayInputStream { - + private final int initialOffset; - + private final int initialLength; public ReusableByteArrayInputStream(byte[] buf, int offset, int length) @@ -53,12 +53,12 @@ public class ReusableByteArrayInputStream extends ByteArrayInputStream count = initialLength; mark = 0; } - + public int getPos() { return pos; } - + public byte[] getBuf() { return buf; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java b/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java index 7f3061b..46222b1 100644 --- a/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java +++ b/library/src/test/java/com/datatorrent/lib/algo/BottomNUnifierTest.java @@ -32,13 +32,13 @@ public class BottomNUnifierTest @Test public void testUnifier() { - + // Instantiate unifier BottomNUnifier<String, Integer> oper = new BottomNUnifier<>(); oper.setN(2); CollectorTestSink sink = new CollectorTestSink(); oper.mergedport.setSink(sink); - + oper.beginWindow(1); ArrayList<Integer> values = new ArrayList<Integer>(); values.add(5); @@ -53,7 +53,7 @@ public class BottomNUnifierTest tuple.put("a", values); oper.process(tuple); oper.endWindow(); - + Assert.assertEquals("Tuples in sink", sink.collectedTuples.size(), 1); tuple = (HashMap<String, ArrayList<Integer>>)sink.collectedTuples.get(0); values = tuple.get("a"); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java b/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java index dd19d2b..0c45542 100644 --- a/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java +++ b/library/src/test/java/com/datatorrent/lib/converter/MapToKeyValuePairConverterTest.java @@ -30,25 +30,25 @@ import com.datatorrent.lib.util.TestUtils; public class MapToKeyValuePairConverterTest { @Test - public void MapToKeyValuePairConversion() + public void MapToKeyValuePairConversion() { MapToKeyValuePairConverter<String, Integer> testop = new MapToKeyValuePairConverter<String, Integer>(); Integer[] values = {1, 2, 3}; String[] keys = {"a", "b", "c"}; - + HashMap<String, Integer> inputMap = new HashMap<String, Integer>(); for (int i = 0; i < 3; i++) { inputMap.put(keys[i], values[i]); } - - CollectorTestSink<KeyValPair<String, Integer>> testsink = new CollectorTestSink<KeyValPair<String, Integer>>(); + + CollectorTestSink<KeyValPair<String, Integer>> testsink = new CollectorTestSink<KeyValPair<String, Integer>>(); TestUtils.setSink(testop.output, testsink); - + testop.beginWindow(0); - + testop.input.put(inputMap); - + testop.endWindow(); Assert.assertEquals(3,testsink.collectedTuples.size()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java b/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java index 8a5eed2..22e9f72 100644 --- a/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java +++ b/library/src/test/java/com/datatorrent/lib/converter/StringValueToNumberConverterForMapTest.java @@ -31,25 +31,25 @@ public class StringValueToNumberConverterForMapTest { @Test - public void testStringValueToNumericConversion() + public void testStringValueToNumericConversion() { StringValueToNumberConverterForMap<String> testop = new StringValueToNumberConverterForMap<String>(); String[] values = {"1.0", "2.0", "3.0"}; String[] keys = {"a", "b", "c"}; - + HashMap<String, String> inputMap = new HashMap<String, String>(); for (int i = 0; i < 3; i++) { inputMap.put(keys[i], values[i]); } - - CollectorTestSink<Map<String, Number>> testsink = new CollectorTestSink<Map<String, Number>>(); + + CollectorTestSink<Map<String, Number>> testsink = new CollectorTestSink<Map<String, Number>>(); TestUtils.setSink(testop.output, testsink); - + testop.beginWindow(0); - + testop.input.put(inputMap); - + testop.endWindow(); Assert.assertEquals(1,testsink.collectedTuples.size()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java b/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java index 335418a..61464dd 100644 --- a/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/cache/CacheStoreTest.java @@ -33,7 +33,7 @@ public class CacheStoreTest public void CacheStoreTest() { final Map<Object, Object> backupMap = Maps.newHashMap(); - + backupMap.put(1, "one"); backupMap.put(2, "two"); backupMap.put(3, "three"); @@ -44,7 +44,7 @@ public class CacheStoreTest backupMap.put(8, "eight"); backupMap.put(9, "nine"); backupMap.put(10, "ten"); - + CacheStore cs = new CacheStore(); cs.setMaxCacheSize(5); try { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java index 908f02f..5c398d2 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcIOAppTest.java @@ -121,7 +121,7 @@ public class JdbcIOAppTest lma.prepareDAG(new JdbcIOApp(), conf); LocalMode.Controller lc = lma.getController(); lc.runAsync(); - // wait for records to be added to table + // wait for records to be added to table Thread.sleep(3000); lc.shutdown(); Assert.assertEquals("Events in store", 10, getNumOfEventsInStore()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java index 1ffe256..ac17c2f 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java @@ -121,12 +121,12 @@ public class JdbcOperatorTest { this.startTimestamp = startTimestamp; } - + public double getScore() { return score; } - + public void setScore(double score) { this.score = score; @@ -225,7 +225,7 @@ public class JdbcOperatorTest pStmt.setDouble(6, new Double(55.4)); pStmt.executeUpdate(); } - + } catch (SQLException e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java index 1fe6484..567e27a 100644 --- a/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java +++ b/library/src/test/java/com/datatorrent/lib/filter/FilterTest.java @@ -175,7 +175,7 @@ public class FilterTest clearFilterOperator(); } - + @Test public void testOptionalExpressionFunctions() { @@ -183,7 +183,7 @@ public class FilterTest prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 1)"); Assert.assertEquals(6, filter.getExpressionFunctions().size()); } - + @Test public void testSetOptionalExpressionFunctionsItem() { @@ -191,8 +191,8 @@ public class FilterTest prepareFilterOperator(DummyPublicPOJO.class, "({$}.val == 1)"); Assert.assertEquals(6, filter.getExpressionFunctions().size()); } - - + + @Before public void setup() { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java index bb51ca4..c3e2cde 100644 --- a/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java +++ b/library/src/test/java/com/datatorrent/lib/formatter/XmlFormatterTest.java @@ -173,7 +173,7 @@ public class XmlFormatterTest + "</EmployeeBean>"; Assert.assertEquals(expected, validDataSink.collectedTuples.get(0)); } - + public static class DateAdapter extends XmlAdapter<String, Date> { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index e1f23d1..2f926d3 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -88,7 +88,7 @@ public class AbstractFileInputOperatorTest @Rule public TestMeta testMeta = new TestMeta(); - + @Test public void testSinglePartiton() throws Exception { http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index 0fff870..03f3bf6 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -1422,7 +1422,7 @@ public class AbstractFileOutputOperatorTest Assert.assertEquals("Max length validation not thrown with -1 max length", true, error); } - + @Test public void testPeriodicRotation() { @@ -1439,7 +1439,7 @@ public class AbstractFileOutputOperatorTest for (int j = 0; j < i; ++j) { writer.input.put(2 * j + 1); } - writer.endWindow(); + writer.endWindow(); } writer.committed(29); Set<String> fileNames = new TreeSet<String>(); @@ -1543,7 +1543,7 @@ public class AbstractFileOutputOperatorTest // http://bugs.java.com/bugdatabase/view_bug.do?bug_id=4691425 List<Long> evenOffsets = new ArrayList<Long>(); List<Long> oddOffsets = new ArrayList<Long>(); - + writer.setFilePath(testMeta.getDir()); writer.setAlwaysWriteToTmp(false); writer.setup(testMeta.testOperatorContext); @@ -1633,7 +1633,7 @@ public class AbstractFileOutputOperatorTest throw new RuntimeException(e); } } - + int numWindows = 0; try { fis = new FileInputStream(file); @@ -1651,7 +1651,7 @@ public class AbstractFileOutputOperatorTest throw new RuntimeException(e); } } - + long startOffset = 0; for (long offset : offsets) { // Skip initial case in case file is not yet created @@ -1792,8 +1792,8 @@ public class AbstractFileOutputOperatorTest { counterStream = new CounterFilterOutputStream(outputStream); } - - public boolean isDoInit() + + public boolean isDoInit() { return (counterStream == null); } @@ -1809,7 +1809,7 @@ public class AbstractFileOutputOperatorTest { } - + public long getCounter() { if (isDoInit()) { @@ -1817,10 +1817,10 @@ public class AbstractFileOutputOperatorTest } else { return counterStream.getCounter(); } - + } } - + private static class CounterFilterOutputStream extends FilterOutputStream { long counter; @@ -1830,7 +1830,7 @@ public class AbstractFileOutputOperatorTest { super(out); } - + @Override public void write(int b) throws IOException { @@ -1869,5 +1869,5 @@ public class AbstractFileOutputOperatorTest return counter; } } - + } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java index e5193b6..17febf6 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractSingleFileOutputOperatorTest.java @@ -173,7 +173,7 @@ public class AbstractSingleFileOutputOperatorTest { writer.setOutputFileName(SINGLE_FILE); writer.setPartitionedFileNameformat(""); - + File meta = new File(testMeta.getDir()); writer.setFilePath(meta.getAbsolutePath()); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/763d14fc/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java index e1f57d9..e0ca9b6 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FastMergerDecisionMakerTest.java @@ -74,7 +74,7 @@ public class FastMergerDecisionMakerTest /** * If some block is missing then expect BlockNotFoundException. - * + * * @throws IOException * @throws BlockNotFoundException */ @@ -111,7 +111,7 @@ public class FastMergerDecisionMakerTest /** * All blocks are of same size which is same as default blockSize. Then fast * merge is possible - * + * * @throws IOException * @throws BlockNotFoundException */ @@ -126,7 +126,7 @@ public class FastMergerDecisionMakerTest * All blocks (except last block)are of same size which is same as default * blockSize. Last block is smaller than default blockSize Then fast merge is * possible - * + * * @throws IOException * @throws BlockNotFoundException */ @@ -141,7 +141,7 @@ public class FastMergerDecisionMakerTest /** * Some block other than last block is of different size. Then fast merge is * not possible - * + * * @throws IOException * @throws BlockNotFoundException */ @@ -156,7 +156,7 @@ public class FastMergerDecisionMakerTest /** * Some block other than last block is of different size. Then fast merge is * not possible - * + * * @throws IOException * @throws BlockNotFoundException */ @@ -171,7 +171,7 @@ public class FastMergerDecisionMakerTest /** * Some block other than last block is of different size. Then fast merge is * not possible - * + * * @throws IOException * @throws BlockNotFoundException */
