Repository: incubator-apex-malhar Updated Branches: refs/heads/master 79eeff782 -> c5cab8bd5
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMin.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMin.java new file mode 100644 index 0000000..1f53011 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorMin.java @@ -0,0 +1,266 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * This {@link IncrementalAggregator} takes the min of the fields provided in the {@link InputEvent}. + * + * @since 3.1.0 + */ +@Name("MIN") +public class AggregatorMin extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301648L; + + public AggregatorMin() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + Aggregate aggregate = super.getGroup(src, aggregatorIndex); + + GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); + + return aggregate; + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset; + for (int index = 0; + index < destByte.length; + index++) { + byte tempVal = srcByte[srcIndices[index]]; + if (destByte[index] > tempVal) { + destByte[index] = tempVal; + } + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset; + for (int index = 0; + index < destShort.length; + index++) { + short tempVal = srcShort[srcIndices[index]]; + if (destShort[index] > tempVal) { + destShort[index] = tempVal; + } + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset; + for (int index = 0; + index < destInteger.length; + index++) { + int tempVal = srcInteger[srcIndices[index]]; + if (destInteger[index] > tempVal) { + destInteger[index] = tempVal; + } + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset; + for (int index = 0; + index < destLong.length; + index++) { + long tempVal = srcLong[srcIndices[index]]; + if (destLong[index] > tempVal) { + destLong[index] = tempVal; + } + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset; + for (int index = 0; + index < destFloat.length; + index++) { + float tempVal = srcFloat[srcIndices[index]]; + if (destFloat[index] > tempVal) { + destFloat[index] = tempVal; + } + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset; + for (int index = 0; + index < destDouble.length; + index++) { + double tempVal = srcDouble[srcIndices[index]]; + if (destDouble[index] > tempVal) { + destDouble[index] = tempVal; + } + } + } + } + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + + for (int index = 0; + index < destByte.length; + index++) { + if (destByte[index] > srcByte[index]) { + destByte[index] = srcByte[index]; + } + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + + for (int index = 0; + index < destShort.length; + index++) { + if (destShort[index] > srcShort[index]) { + destShort[index] = srcShort[index]; + } + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + + for (int index = 0; + index < destInteger.length; + index++) { + if (destInteger[index] > srcInteger[index]) { + destInteger[index] = srcInteger[index]; + } + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + + for (int index = 0; + index < destLong.length; + index++) { + if (destLong[index] > srcLong[index]) { + destLong[index] = srcLong[index]; + } + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + + for (int index = 0; + index < destFloat.length; + index++) { + if (destFloat[index] > srcFloat[index]) { + destFloat[index] = srcFloat[index]; + } + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + + for (int index = 0; + index < destDouble.length; + index++) { + if (destDouble[index] > srcDouble[index]) { + destDouble[index] = srcDouble[index]; + } + } + } + } + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorOTFType.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorOTFType.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorOTFType.java new file mode 100644 index 0000000..b46ae38 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorOTFType.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import java.util.Collections; +import java.util.Map; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * This is a convenience enum to store all the information about default {@link OTFAggregator}s + * in one place. + * + * @since 3.1.0 + */ +public enum AggregatorOTFType +{ + /** + * The average {@link OTFAggregator}. + */ + AVG(AggregatorAverage.INSTANCE); + + /** + * A map from {@link OTFAggregator} names to {@link OTFAggregator}s. + */ + public static final Map<String, OTFAggregator> NAME_TO_AGGREGATOR; + + static { + Map<String, OTFAggregator> nameToAggregator = Maps.newHashMap(); + + for (AggregatorOTFType aggType : AggregatorOTFType.values()) { + nameToAggregator.put(aggType.name(), aggType.getAggregator()); + } + + NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator); + } + + /** + * The {@link OTFAggregator} assigned to this enum. + */ + private OTFAggregator aggregator; + + /** + * Creates an {@link OTFAggregator} enum with the given aggregator. + * + * @param aggregator The {@link OTFAggregator} assigned to this enum. + */ + AggregatorOTFType(OTFAggregator aggregator) + { + setAggregator(aggregator); + } + + /** + * Sets the {@link OTFAggregator} assigned to this enum. + * + * @param aggregator The {@link OTFAggregator} assigned to this enum. + */ + private void setAggregator(OTFAggregator aggregator) + { + this.aggregator = Preconditions.checkNotNull(aggregator); + } + + /** + * Gets the {@link OTFAggregator} assigned to this enum. + * + * @return The {@link OTFAggregator} assigned to this enum. + */ + public OTFAggregator getAggregator() + { + return aggregator; + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java new file mode 100644 index 0000000..fd9fc56 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorRegistry.java @@ -0,0 +1,424 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * <p> + * This registry is used by generic dimensions computation operators and dimension stores in order to support + * plugging different + * aggregators into the operator. Subclasses of + * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema} use this registry + * to support pluggable aggregators when doing dimensions computation, and Subclasses of + * AppDataSingleSchemaDimensionStoreHDHT use this class as well. + * </p> + * <p> + * The primary purpose of an {@link AggregatorRegistry} is to provide a mapping from aggregator names to aggregators, + * and to provide mappings from aggregator IDs to aggregators. These mappings are necessary in order to correctly + * process schemas, App Data queries, and store aggregated data. + * </p> + * + * @since 3.1.0 + */ +public class AggregatorRegistry implements Serializable +{ + private static final long serialVersionUID = 20154301642L; + + /** + * This is a map from {@link IncrementalAggregator} names to {@link IncrementalAggregator}s used by the + * default {@link AggregatorRegistry}. + */ + private static final transient Map<String, IncrementalAggregator> DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR; + /** + * This is a map from {@link OTFAggregator} names to {@link OTFAggregator}s used by the default + * {@link AggregatorRegistry}. + */ + private static final transient Map<String, OTFAggregator> DEFAULT_NAME_TO_OTF_AGGREGATOR; + + //Build the default maps + static { + DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR = Maps.newHashMap(AggregatorIncrementalType.NAME_TO_AGGREGATOR); + DEFAULT_NAME_TO_OTF_AGGREGATOR = Maps.newHashMap(AggregatorOTFType.NAME_TO_AGGREGATOR); + } + + /** + * This is a default aggregator registry that can be used in operators. + */ + public static final AggregatorRegistry DEFAULT_AGGREGATOR_REGISTRY = new AggregatorRegistry( + DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR, DEFAULT_NAME_TO_OTF_AGGREGATOR, + AggregatorIncrementalType.NAME_TO_ORDINAL); + + /** + * This is a flag indicating whether or not this {@link AggregatorRegistry} has been setup before or not. + */ + private transient boolean setup = false; + /** + * This is a map from the class of an {@link IncrementalAggregator} to the name of that + * {@link IncrementalAggregator}. + */ + private transient Map<Class<? extends IncrementalAggregator>, String> classToIncrementalAggregatorName; + /** + * This is a map from the name of an {@link OTFAggregator} to the list of the names of all + * {@link IncrementalAggregator} that are child aggregators of that {@link OTFAggregator}. + */ + private transient Map<String, List<String>> otfAggregatorToIncrementalAggregators; + /** + * This is a map from the aggregator ID of an + * {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}. + */ + private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator; + /** + * This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link IncrementalAggregator}. + */ + private Map<String, IncrementalAggregator> nameToIncrementalAggregator; + /** + * This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}. + */ + private Map<String, OTFAggregator> nameToOTFAggregator; + /** + * This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link IncrementalAggregator}. + */ + private Map<String, Integer> incrementalAggregatorNameToID; + + /** + * This is a helper method used to autogenerate the IDs for each {@link IncrementalAggregator} + * + * @param nameToAggregator A mapping from the name of an {@link IncrementalAggregator} to the + * {@link IncrementalAggregator}. + * @return A mapping from the name of an {@link IncrementalAggregator} to the ID assigned to that + * {@link IncrementalAggregator}. + */ + private static Map<String, Integer> autoGenIds(Map<String, IncrementalAggregator> nameToAggregator) + { + Map<String, Integer> staticAggregatorNameToID = Maps.newHashMap(); + + for (Map.Entry<String, IncrementalAggregator> entry : nameToAggregator.entrySet()) { + staticAggregatorNameToID.put(entry.getKey(), stringHash(entry.getValue().getClass().getName())); + } + + return staticAggregatorNameToID; + } + + /** + * This is a helper method for computing the hash of the string. This is intended to be a static unchanging + * method since the computed hash is used for aggregator IDs which are used for persistence. + * <p> + * <b>Note:</b> Do not change this function it will cause corruption for users updating existing data stores. + * </p> + * + * @return The hash of the given string. + */ + private static int stringHash(String string) + { + int hash = 5381; + + for (int index = 0; + index < string.length(); + index++) { + int character = (int)string.charAt(index); + hash = hash * 33 + character; + } + + return hash; + } + + /** + * This constructor is present for Kryo serialization + */ + private AggregatorRegistry() + { + //for kryo + } + + /** + * <p> + * This creates an {@link AggregatorRegistry} which assigns the given names to the given + * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor also auto-generates + * the IDs associated with each {@link IncrementalAggregator} by computing the hashcode of the + * fully qualified class name of each {@link IncrementalAggregator}. + * </p> + * <p> + * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the + * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored. + * </p> + * + * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator}, + * where the string is the name of an + * {@link IncrementalAggregator} and the value is the {@link IncrementalAggregator} + * with that name. + * @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the string + * is the name of + * an {@link OTFAggregator} and the value is the {@link OTFAggregator} with that + * name. + */ + public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator, + Map<String, OTFAggregator> nameToOTFAggregator) + { + this(nameToIncrementalAggregator, + nameToOTFAggregator, + autoGenIds(nameToIncrementalAggregator)); + } + + /** + * <p> + * This creates an {@link AggregatorRegistry} which assigns the given names to the given + * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor assigns IDs to each + * {@link IncrementalAggregator} by using the provided map from incremental aggregator names to IDs. + * </p> + * <p> + * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the + * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored. + * </p> + * + * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator}, + * where the string is the name of an + * {@link IncrementalAggregator} and the value is the + * {@link IncrementalAggregator} + * with that name. + * @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the + * string is the name of + * an {@link OTFAggregator} and the value is the {@link OTFAggregator} with + * that name. + * @param incrementalAggregatorNameToID This is a map from the name of an {@link IncrementalAggregator} to the ID + * for that + * {@link IncrementalAggregator}. + */ + public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator, + Map<String, OTFAggregator> nameToOTFAggregator, + Map<String, Integer> incrementalAggregatorNameToID) + { + setNameToIncrementalAggregator(nameToIncrementalAggregator); + setNameToOTFAggregator(nameToOTFAggregator); + + setIncrementalAggregatorNameToID(incrementalAggregatorNameToID); + + validate(); + } + + /** + * This is a helper method which is used to do validation on the maps provided to the constructor of this class. + */ + private void validate() + { + for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + + for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + + for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + } + + /** + * This method is called to initialize various internal datastructures of the {@link AggregatorRegistry}. + * This method should be called before the {@link AggregatorRegistry} is used. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + public void setup() + { + if (setup) { + //If the AggregatorRegistry was already setup. Don't set it up again. + return; + } + + setup = true; + + classToIncrementalAggregatorName = Maps.newHashMap(); + + for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) { + classToIncrementalAggregatorName.put((Class)entry.getValue().getClass(), entry.getKey()); + } + + incrementalAggregatorIDToAggregator = Maps.newHashMap(); + + for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) { + String aggregatorName = entry.getKey(); + int aggregatorID = entry.getValue(); + incrementalAggregatorIDToAggregator.put(aggregatorID, + nameToIncrementalAggregator.get(aggregatorName)); + } + + otfAggregatorToIncrementalAggregators = Maps.newHashMap(); + + for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) { + String name = entry.getKey(); + List<String> staticAggregators = Lists.newArrayList(); + + OTFAggregator dotfAggregator = nameToOTFAggregator.get(name); + + for (Class clazz : dotfAggregator.getChildAggregators()) { + staticAggregators.add(classToIncrementalAggregatorName.get(clazz)); + } + + otfAggregatorToIncrementalAggregators.put(name, staticAggregators); + } + } + + /** + * This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s name + * to an {@link IncrementalAggregator}. + * + * @param nameToIncrementalAggregator The mapping from an {@link IncrementalAggregator}'s name to an + * {@link IncrementalAggregator}. + */ + private void setNameToIncrementalAggregator(Map<String, IncrementalAggregator> nameToIncrementalAggregator) + { + this.nameToIncrementalAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToIncrementalAggregator)); + } + + /** + * This is a helper method which sets and validates the given mapping from an {@link OTFAggregator}'s name to + * an {@link OTFAggregator}. + * + * @param nameToOTFAggregator The mapping from an {@link OTFAggregator}'s name to an {@link OTFAggregator}. + */ + private void setNameToOTFAggregator(Map<String, OTFAggregator> nameToOTFAggregator) + { + this.nameToOTFAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToOTFAggregator)); + } + + /** + * Checks if the given aggregatorName is the name of an {@link IncrementalAggregator} or {@link OTFAggregator} + * registered to this registry. + * + * @param aggregatorName The aggregator name to check. + * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered to + * this registry. False otherwise. + */ + public boolean isAggregator(String aggregatorName) + { + return classToIncrementalAggregatorName.values().contains(aggregatorName) || + nameToOTFAggregator.containsKey(aggregatorName); + } + + /** + * Checks if the given aggregator name is the name of an {@link IncrementalAggregator} registered + * to this registry. + * + * @param aggregatorName The aggregator name to check. + * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered + * to this registry. False otherwise. + */ + public boolean isIncrementalAggregator(String aggregatorName) + { + return classToIncrementalAggregatorName.values().contains(aggregatorName); + } + + /** + * Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}. + * + * @return The mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}. + */ + public Map<Class<? extends IncrementalAggregator>, String> getClassToIncrementalAggregatorName() + { + return classToIncrementalAggregatorName; + } + + /** + * Gets the mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}. + * + * @return The mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}. + */ + public Map<Integer, IncrementalAggregator> getIncrementalAggregatorIDToAggregator() + { + return incrementalAggregatorIDToAggregator; + } + + /** + * This a helper method which sets and validates the mapping from {@link IncrementalAggregator} name to + * {@link IncrementalAggregator} ID. + * + * @param incrementalAggregatorNameToID The mapping from {@link IncrementalAggregator} name to + * {@link IncrementalAggregator} ID. + */ + private void setIncrementalAggregatorNameToID(Map<String, Integer> incrementalAggregatorNameToID) + { + Preconditions.checkNotNull(incrementalAggregatorNameToID); + + for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) { + Preconditions.checkNotNull(entry.getKey()); + Preconditions.checkNotNull(entry.getValue()); + } + + this.incrementalAggregatorNameToID = Maps.newHashMap(incrementalAggregatorNameToID); + } + + /** + * This returns a map from the names of an {@link IncrementalAggregator}s to the corresponding ID of the + * {@link IncrementalAggregator}. + * + * @return Returns a map from the names of an {@link IncrementalAggregator} to the corresponding ID of the + * {@link IncrementalAggregator}. + */ + public Map<String, Integer> getIncrementalAggregatorNameToID() + { + return incrementalAggregatorNameToID; + } + + /** + * Returns the name to {@link OTFAggregator} mapping, where the key is the name of the {@link OTFAggregator}. + * + * @return The name to {@link OTFAggregator} mapping. + */ + public Map<String, OTFAggregator> getNameToOTFAggregators() + { + return nameToOTFAggregator; + } + + /** + * Returns the mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of + * that {@link OTFAggregator}. + * + * @return The mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of + * that {@link OTFAggregator}. + */ + public Map<String, List<String>> getOTFAggregatorToIncrementalAggregators() + { + return otfAggregatorToIncrementalAggregators; + } + + /** + * Returns the name to {@link IncrementalAggregator} mapping, where the key is the name of the {@link OTFAggregator}. + * + * @return The name to {@link IncrementalAggregator} mapping. + */ + public Map<String, IncrementalAggregator> getNameToIncrementalAggregator() + { + return nameToIncrementalAggregator; + } + + private static final Logger lOG = LoggerFactory.getLogger(AggregatorRegistry.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorSum.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorSum.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorSum.java new file mode 100644 index 0000000..9c79247 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorSum.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent; + +import com.datatorrent.api.annotation.Name; +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.gpo.GPOUtils; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * This {@link IncrementalAggregator} performs a sum operation over the fields in the given {@link InputEvent}. + * + * @since 3.1.0 + */ +@Name("SUM") +public class AggregatorSum extends AbstractIncrementalAggregator +{ + private static final long serialVersionUID = 20154301649L; + + public AggregatorSum() + { + //Do nothing + } + + @Override + public Aggregate getGroup(InputEvent src, int aggregatorIndex) + { + src.used = true; + Aggregate aggregate = createAggregate(src, + context, + aggregatorIndex); + + GPOMutable value = aggregate.getAggregates(); + GPOUtils.zeroFillNumeric(value); + + return aggregate; + } + + @Override + public void aggregate(Aggregate dest, Aggregate src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + aggregateAggs(destAggs, srcAggs); + } + + public void aggregateAggs(GPOMutable destAggs, GPOMutable srcAggs) + { + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + + for (int index = 0; + index < destByte.length; + index++) { + destByte[index] += srcByte[index]; + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + + for (int index = 0; + index < destShort.length; + index++) { + destShort[index] += srcShort[index]; + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + + for (int index = 0; + index < destInteger.length; + index++) { + destInteger[index] += srcInteger[index]; + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + + for (int index = 0; + index < destLong.length; + index++) { + destLong[index] += srcLong[index]; + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + + for (int index = 0; + index < destFloat.length; + index++) { + destFloat[index] += srcFloat[index]; + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + + for (int index = 0; + index < destDouble.length; + index++) { + destDouble[index] += srcDouble[index]; + } + } + } + } + + @Override + public void aggregate(Aggregate dest, InputEvent src) + { + GPOMutable destAggs = dest.getAggregates(); + GPOMutable srcAggs = src.getAggregates(); + + aggregateInput(destAggs, srcAggs); + } + + public void aggregateInput(GPOMutable destAggs, GPOMutable srcAggs) + { + { + byte[] destByte = destAggs.getFieldsByte(); + if (destByte != null) { + byte[] srcByte = srcAggs.getFieldsByte(); + int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset; + for (int index = 0; + index < destByte.length; + index++) { + destByte[index] += srcByte[srcIndices[index]]; + } + } + } + + { + short[] destShort = destAggs.getFieldsShort(); + if (destShort != null) { + short[] srcShort = srcAggs.getFieldsShort(); + int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset; + for (int index = 0; + index < destShort.length; + index++) { + destShort[index] += srcShort[srcIndices[index]]; + } + } + } + + { + int[] destInteger = destAggs.getFieldsInteger(); + if (destInteger != null) { + int[] srcInteger = srcAggs.getFieldsInteger(); + int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset; + for (int index = 0; + index < destInteger.length; + index++) { + destInteger[index] += srcInteger[srcIndices[index]]; + } + } + } + + { + long[] destLong = destAggs.getFieldsLong(); + if (destLong != null) { + long[] srcLong = srcAggs.getFieldsLong(); + int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset; + for (int index = 0; + index < destLong.length; + index++) { + destLong[index] += srcLong[srcIndices[index]]; + } + } + } + + { + float[] destFloat = destAggs.getFieldsFloat(); + if (destFloat != null) { + float[] srcFloat = srcAggs.getFieldsFloat(); + int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset; + for (int index = 0; + index < destFloat.length; + index++) { + destFloat[index] += srcFloat[srcIndices[index]]; + } + } + } + + { + double[] destDouble = destAggs.getFieldsDouble(); + if (destDouble != null) { + double[] srcDouble = srcAggs.getFieldsDouble(); + int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset; + for (int index = 0; + index < destDouble.length; + index++) { + destDouble[index] += srcDouble[srcIndices[index]]; + } + } + } + } + + @Override + public Type getOutputType(Type inputType) + { + return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType); + } + + @Override + public FieldsDescriptor getMetaDataDescriptor() + { + return null; + } + + private static final Logger LOG = LoggerFactory.getLogger(AggregatorSum.class); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java new file mode 100644 index 0000000..6085254 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/AggregatorUtils.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import java.util.Collections; +import java.util.Map; + +import com.google.common.collect.Maps; + +import com.datatorrent.lib.appdata.schemas.Fields; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * This class contains utility methods which are useful for aggregators. + * + * @since 3.1.0 + */ +public final class AggregatorUtils +{ + /** + * This is an identity type map, which maps input types to the same output types. + */ + public static final transient Map<Type, Type> IDENTITY_TYPE_MAP; + /** + * This is an identity type map, for numeric types only. This is + * helpful when creating aggregators like {@link AggregatorSum}, where the sum of ints is an + * int and the sum of floats is a float. + */ + public static final transient Map<Type, Type> IDENTITY_NUMBER_TYPE_MAP; + + static { + Map<Type, Type> identityTypeMap = Maps.newHashMap(); + + for (Type type : Type.values()) { + identityTypeMap.put(type, type); + } + + IDENTITY_TYPE_MAP = Collections.unmodifiableMap(identityTypeMap); + + Map<Type, Type> identityNumberTypeMap = Maps.newHashMap(); + + for (Type type : Type.NUMERIC_TYPES) { + identityNumberTypeMap.put(type, type); + } + + IDENTITY_NUMBER_TYPE_MAP = Collections.unmodifiableMap(identityNumberTypeMap); + } + + /** + * Don't instantiate this class. + */ + private AggregatorUtils() + { + //Don't instantiate this class. + } + + /** + * This is a helper method which takes a {@link FieldsDescriptor} object, which defines the types of the fields + * that the {@link IncrementalAggregator} receives as input. It then uses the given {@link IncrementalAggregator} + * and {@link FieldsDescriptor} object to compute the {@link FieldsDescriptor} object for the aggregation produced + * byte the given + * {@link IncrementalAggregator} when it receives an input corresponding to the given input {@link FieldsDescriptor}. + * + * @param inputFieldsDescriptor This is a {@link FieldsDescriptor} object which defines the names and types of input + * data recieved by an aggregator. + * @param incrementalAggregator This is the + * {@link IncrementalAggregator} for which an output {@link FieldsDescriptor} needs + * to be computed. + * @return The output {@link FieldsDescriptor} for this aggregator when it receives input data with the same schema as + * the specified input {@link FieldsDescriptor}. + */ + public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor, + IncrementalAggregator incrementalAggregator) + { + Map<String, Type> fieldToType = Maps.newHashMap(); + + for (Map.Entry<String, Type> entry : + inputFieldsDescriptor.getFieldToType().entrySet()) { + String fieldName = entry.getKey(); + Type fieldType = entry.getValue(); + Type outputType = incrementalAggregator.getOutputType(fieldType); + fieldToType.put(fieldName, outputType); + } + + return new FieldsDescriptor(fieldToType); + } + + /** + * This is a utility method which creates an output {@link FieldsDescriptor} using the field names + * from the given {@link FieldsDescriptor} and the output type of the given {@link OTFAggregator}. + * + * @param inputFieldsDescriptor The {@link FieldsDescriptor} from which to derive the field names used + * for the output fields descriptor. + * @param otfAggregator The {@link OTFAggregator} to use for creating the output {@link FieldsDescriptor}. + * @return The output {@link FieldsDescriptor}. + */ + public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor, + OTFAggregator otfAggregator) + { + Map<String, Type> fieldToType = Maps.newHashMap(); + + for (Map.Entry<String, Type> entry : + inputFieldsDescriptor.getFieldToType().entrySet()) { + String fieldName = entry.getKey(); + Type outputType = otfAggregator.getOutputType(); + fieldToType.put(fieldName, outputType); + } + + return new FieldsDescriptor(fieldToType); + } + + /** + * This is a utility method which creates an output {@link FieldsDescriptor} from the + * given field names and the given {@link OTFAggregator}. + * + * @param fields The names of the fields to be included in the output {@link FieldsDescriptor}. + * @param otfAggregator The {@link OTFAggregator} to use when creating the output {@link FieldsDescriptor}. + * @return The output {@link FieldsDescriptor}. + */ + public static FieldsDescriptor getOutputFieldsDescriptor(Fields fields, + OTFAggregator otfAggregator) + { + Map<String, Type> fieldToType = Maps.newHashMap(); + + for (String field : fields.getFields()) { + fieldToType.put(field, otfAggregator.getOutputType()); + } + + return new FieldsDescriptor(fieldToType); + } +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/IncrementalAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/IncrementalAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/IncrementalAggregator.java new file mode 100644 index 0000000..33e868f --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/IncrementalAggregator.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import org.apache.apex.malhar.lib.dimensions.DimensionsConversionContext; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.InputEvent; +import org.apache.apex.malhar.lib.dimensions.aggregator.AggregateEvent.Aggregator; + +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * <p> + * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a + * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for + * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an + * {@link IncrementalAggregator}. + * </p> + * <p> + * {@link IncrementalAggregator}s are intended to be used with subclasses of + * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which + * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator + * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which + * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to + * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields, + * one for cost and one for revenue. + * </p> + * + */ +public interface IncrementalAggregator extends Aggregator<InputEvent, Aggregate> +{ + /** + * This method defines the type mapping for the {@link IncrementalAggregator}. The type mapping defines the + * relationship between the type of an input field and the type of its aggregate. For example if the aggregator takes + * a field of type int and produces an aggregate of type float, then this method would return a type of float when + * the given input type is an int. + * @param inputType The type of a field to be aggregate. + * @return The type of the aggregate corresponding to an input field of the given type. + */ + public Type getOutputType(Type inputType); + + /** + * This sets + */ + public void setDimensionsConversionContext(DimensionsConversionContext context); + + /** + * Returns a {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations. + * This method returns null if this aggregator stores no metadata. + * @return A {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations. + * This method returns null if this aggregator stores no metadata. + */ + public FieldsDescriptor getMetaDataDescriptor(); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/OTFAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/OTFAggregator.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/OTFAggregator.java new file mode 100644 index 0000000..cef32db --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/aggregator/OTFAggregator.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions.aggregator; + +import java.io.Serializable; + +import java.util.List; + +import com.datatorrent.lib.appdata.gpo.GPOMutable; +import com.datatorrent.lib.appdata.schemas.Type; + +/** + * <p> + * This interface represents an On The Fly Aggregator. On the fly aggregators represent a class + * of aggregations which use the results of incremental aggregators, which implement the + * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} interface. An example of an aggregation which + * needs to be performed on the fly is average. Average needs to be performed on the fly because average cannot be + * computed with just an existing average and a new data item, an average required the sum of all data items, and the + * count of all data items. An example implementation of average is {@link AggregatorAverage}. Also note + * that unlike {@link IncrementalAggregator}s an {@link OTFAggregator} only has one output type. This done + * because {@link OTFAggregator}s usually represent a very specific computation, with a specific output type. + * For example, average is a computation that you will almost always want to produce a double. But if you require + * an average operation that produces an integer, that could be done as a separate {@link OTFAggregator}. + * </p> + * <p> + * The primary usage for {@link OTFAggregator}s are in store operators which respond to queries. Currently, + * the only places which utilize {@link OTFAggregator}s are subclasses of the DimensionsStoreHDHT operator. + * </p> + * <p> + * This interface extends {@link Serializable} because On The Fly aggregators may be set + * as properties on some operators and operator properties are required to be java serializable. + * </p> + * @since 3.1.0 + */ +public interface OTFAggregator extends Serializable +{ + public static final long serialVersionUID = 201505251039L; + + /** + * This method returns all the incremental aggregators on which this aggregator depends on + * to compute its result. In the case of {@link AggregatorAverage} it's child aggregators are + * {@link AggregatorCount} and {@link AggregatorSum}. + * @return All the incremental aggregators on which this aggregator depends on to compute its + * result. + */ + + public List<Class<? extends IncrementalAggregator>> getChildAggregators(); + /** + * This method performs an on the fly aggregation from the given aggregates. The aggregates + * provided to this aggregator are each the result of one of this aggregators child aggregators. + * The order in which the aggregates are passed to this method is the same as the order in + * which the child aggregators are listed in the result of the {@link #getChildAggregators} method. + * Also note that this aggregator does not aggregate one field at a time. This aggregator recieves + * a batch of fields from each child aggregator, and the result of the method is also a batch of fields. + * @param aggregates These are the results of all the child aggregators. The results are in the same + * order as the child aggregators specified in the result of the {@link #getChildAggregators} method. + * @return The result of the on the fly aggregation. + */ + + public GPOMutable aggregate(GPOMutable... aggregates); + /** + * Returns the output type of the {@link OTFAggregator}. <b>Note<b> that any combination of input types + * will produce the same output type for {@link OTFAggregator}s. + * @return The output type of the {@link OTFAggregator}. + */ + + public Type getOutputType(); +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/package-info.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/package-info.java new file mode 100644 index 0000000..acee645 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/package-info.java @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ [email protected] +package org.apache.apex.malhar.lib.dimensions; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java index 67f8c76..00a925b 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/CustomTimeBucketRegistryTest.java @@ -18,13 +18,13 @@ */ package com.datatorrent.lib.appdata.dimensions; - import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.dimensions.CustomTimeBucketRegistry; + import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; import com.datatorrent.lib.appdata.schemas.TimeBucket; -import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry; public class CustomTimeBucketRegistryTest { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java index 43b71ef..fd69676 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/dimensions/DimensionsEventTest.java @@ -20,15 +20,16 @@ package com.datatorrent.lib.appdata.dimensions; import java.util.Map; -import com.google.common.collect.Maps; - import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey; + +import com.google.common.collect.Maps; + import com.datatorrent.lib.appdata.gpo.GPOMutable; import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey; public class DimensionsEventTest { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java index 26a06bd..b3669bc 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchemaTest.java @@ -24,23 +24,24 @@ import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import org.codehaus.jettison.json.JSONArray; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.codehaus.jettison.json.JSONArray; + +import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor; +import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorIncrementalType; +import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema.DimensionsCombination; import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema.Key; import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema.Value; -import com.datatorrent.lib.dimensions.DimensionsDescriptor; -import com.datatorrent.lib.dimensions.aggregator.AggregatorIncrementalType; -import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry; public class DimensionalConfigurationSchemaTest { @@ -56,9 +57,9 @@ public class DimensionalConfigurationSchemaTest public void noEnumTest() { //Test if loading of no enums works - DimensionalConfigurationSchema des = - new DimensionalConfigurationSchema(SchemaUtils.jarResourceFileToString("adsGenericEventSchemaNoEnums.json"), - AggregatorRegistry.DEFAULT_AGGREGATOR_REGISTRY); + DimensionalConfigurationSchema des = new DimensionalConfigurationSchema( + SchemaUtils.jarResourceFileToString("adsGenericEventSchemaNoEnums.json"), + AggregatorRegistry.DEFAULT_AGGREGATOR_REGISTRY); DimensionalSchema dimensionalSchema = new DimensionalSchema(des); dimensionalSchema.getSchemaJSON(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java index a98d346..50b539e 100644 --- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java +++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/DimensionalSchemaTest.java @@ -24,20 +24,21 @@ import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import org.codehaus.jettison.json.JSONArray; -import org.codehaus.jettison.json.JSONObject; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry; +import org.codehaus.jettison.json.JSONArray; +import org.codehaus.jettison.json.JSONObject; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + import com.datatorrent.lib.appdata.query.serde.MessageSerializerFactory; -import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry; public class DimensionalSchemaTest { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java b/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java deleted file mode 100644 index 5c4feed..0000000 --- a/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions; - -import org.junit.Assert; -import org.junit.Test; - -import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; -import com.datatorrent.lib.appdata.schemas.TimeBucket; -import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry; - - -public class CustomTimeBucketRegistryTest -{ - @Test - public void testBuildingRegistry() - { - CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry(); - - CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE); - CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR); - CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY); - - timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal()); - timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal()); - timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal()); - - CustomTimeBucket customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.MINUTE.ordinal()); - Assert.assertTrue(customTimeBucket.isUnit()); - Assert.assertEquals(TimeBucket.MINUTE, customTimeBucket.getTimeBucket()); - - customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.HOUR.ordinal()); - Assert.assertTrue(customTimeBucket.isUnit()); - Assert.assertEquals(TimeBucket.HOUR, customTimeBucket.getTimeBucket()); - - customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.DAY.ordinal()); - Assert.assertTrue(customTimeBucket.isUnit()); - Assert.assertEquals(TimeBucket.DAY, customTimeBucket.getTimeBucket()); - - Assert.assertEquals(TimeBucket.MINUTE.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1m)); - Assert.assertEquals(TimeBucket.HOUR.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1h)); - Assert.assertEquals(TimeBucket.DAY.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1d)); - } - - @Test - public void testRegister() - { - CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry(); - - CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE); - CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR); - CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY); - - timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal()); - timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal()); - timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal()); - - int max = Integer.MIN_VALUE; - max = Math.max(max, TimeBucket.MINUTE.ordinal()); - max = Math.max(max, TimeBucket.HOUR.ordinal()); - max = Math.max(max, TimeBucket.DAY.ordinal()); - - CustomTimeBucket c5m = new CustomTimeBucket(TimeBucket.MINUTE, 5L); - - timeBucketRegistry.register(c5m); - int timeBucketId = timeBucketRegistry.getTimeBucketId(c5m); - - Assert.assertEquals(max + 1, timeBucketId); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java b/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java deleted file mode 100644 index 54682b1..0000000 --- a/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions; - -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import com.google.common.collect.Sets; - -import org.junit.Assert; -import org.junit.Test; - -import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; -import com.datatorrent.lib.appdata.schemas.Fields; -import com.datatorrent.lib.appdata.schemas.TimeBucket; -import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsDescriptor; - -public class DimensionsDescriptorTest -{ - public static final String KEY_1_NAME = "key1"; - public static final Type KEY_1_TYPE = Type.INTEGER; - public static final String KEY_2_NAME = "key2"; - public static final Type KEY_2_TYPE = Type.STRING; - - public static final String AGG_1_NAME = "agg1"; - public static final Type AGG_1_TYPE = Type.INTEGER; - public static final String AGG_2_NAME = "agg2"; - public static final Type AGG_2_TYPE = Type.STRING; - - @Test - public void simpleTest1() - { - DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME); - - Set<String> fields = Sets.newHashSet(); - fields.add(KEY_1_NAME); - - Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields()); - Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket()); - } - - @Test - public void simpleTest2() - { - DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME + - DimensionsDescriptor.DELIMETER_SEPERATOR + - KEY_2_NAME); - - Set<String> fields = Sets.newHashSet(); - fields.add(KEY_1_NAME); - fields.add(KEY_2_NAME); - - Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields()); - Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket()); - } - - @Test - public void simpleTimeTest() - { - DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME + - DimensionsDescriptor.DELIMETER_SEPERATOR + - DimensionsDescriptor.DIMENSION_TIME + - DimensionsDescriptor.DELIMETER_EQUALS + - "DAYS"); - - Set<String> fields = Sets.newHashSet(); - fields.add(KEY_1_NAME); - - Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields()); - Assert.assertEquals("The timeunit should be DAYS.", TimeUnit.DAYS, ad.getTimeBucket().getTimeUnit()); - } - - @Test - public void equalsAndHashCodeTest() - { - DimensionsDescriptor ddA = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L), - new Fields(Sets.newHashSet("a", "b"))); - - DimensionsDescriptor ddB = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L), - new Fields(Sets.newHashSet("a", "b"))); - - Assert.assertTrue(ddB.equals(ddA)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java new file mode 100644 index 0000000..c9524b1 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistryTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.malhar.lib.dimensions.CustomTimeBucketRegistry; + +import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; +import com.datatorrent.lib.appdata.schemas.TimeBucket; + +public class CustomTimeBucketRegistryTest +{ + @Test + public void testBuildingRegistry() + { + CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry(); + + CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE); + CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR); + CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY); + + timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal()); + timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal()); + timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal()); + + CustomTimeBucket customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.MINUTE.ordinal()); + Assert.assertTrue(customTimeBucket.isUnit()); + Assert.assertEquals(TimeBucket.MINUTE, customTimeBucket.getTimeBucket()); + + customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.HOUR.ordinal()); + Assert.assertTrue(customTimeBucket.isUnit()); + Assert.assertEquals(TimeBucket.HOUR, customTimeBucket.getTimeBucket()); + + customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.DAY.ordinal()); + Assert.assertTrue(customTimeBucket.isUnit()); + Assert.assertEquals(TimeBucket.DAY, customTimeBucket.getTimeBucket()); + + Assert.assertEquals(TimeBucket.MINUTE.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1m)); + Assert.assertEquals(TimeBucket.HOUR.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1h)); + Assert.assertEquals(TimeBucket.DAY.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1d)); + } + + @Test + public void testRegister() + { + CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry(); + + CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE); + CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR); + CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY); + + timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal()); + timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal()); + timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal()); + + int max = Integer.MIN_VALUE; + max = Math.max(max, TimeBucket.MINUTE.ordinal()); + max = Math.max(max, TimeBucket.HOUR.ordinal()); + max = Math.max(max, TimeBucket.DAY.ordinal()); + + CustomTimeBucket c5m = new CustomTimeBucket(TimeBucket.MINUTE, 5L); + + timeBucketRegistry.register(c5m); + int timeBucketId = timeBucketRegistry.getTimeBucketId(c5m); + + Assert.assertEquals(max + 1, timeBucketId); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java new file mode 100644 index 0000000..3101577 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/dimensions/DimensionsDescriptorTest.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor; + +import com.google.common.collect.Sets; + +import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; +import com.datatorrent.lib.appdata.schemas.Fields; +import com.datatorrent.lib.appdata.schemas.TimeBucket; +import com.datatorrent.lib.appdata.schemas.Type; + +public class DimensionsDescriptorTest +{ + public static final String KEY_1_NAME = "key1"; + public static final Type KEY_1_TYPE = Type.INTEGER; + public static final String KEY_2_NAME = "key2"; + public static final Type KEY_2_TYPE = Type.STRING; + + public static final String AGG_1_NAME = "agg1"; + public static final Type AGG_1_TYPE = Type.INTEGER; + public static final String AGG_2_NAME = "agg2"; + public static final Type AGG_2_TYPE = Type.STRING; + + @Test + public void simpleTest1() + { + DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME); + + Set<String> fields = Sets.newHashSet(); + fields.add(KEY_1_NAME); + + Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields()); + Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket()); + } + + @Test + public void simpleTest2() + { + DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME + + DimensionsDescriptor.DELIMETER_SEPERATOR + + KEY_2_NAME); + + Set<String> fields = Sets.newHashSet(); + fields.add(KEY_1_NAME); + fields.add(KEY_2_NAME); + + Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields()); + Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket()); + } + + @Test + public void simpleTimeTest() + { + DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME + + DimensionsDescriptor.DELIMETER_SEPERATOR + + DimensionsDescriptor.DIMENSION_TIME + + DimensionsDescriptor.DELIMETER_EQUALS + + "DAYS"); + + Set<String> fields = Sets.newHashSet(); + fields.add(KEY_1_NAME); + + Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields()); + Assert.assertEquals("The timeunit should be DAYS.", TimeUnit.DAYS, ad.getTimeBucket().getTimeUnit()); + } + + @Test + public void equalsAndHashCodeTest() + { + DimensionsDescriptor ddA = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L), + new Fields(Sets.newHashSet("a", "b"))); + + DimensionsDescriptor ddB = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L), + new Fields(Sets.newHashSet("a", "b"))); + + Assert.assertTrue(ddB.equals(ddA)); + } +}
