Repository: incubator-apex-malhar Updated Branches: refs/heads/devel-3 b067525de -> 8d48e403a
MLHR-1944 #resolve #comment added dimensions helper classes Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a4718f02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a4718f02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a4718f02 Branch: refs/heads/devel-3 Commit: a4718f02434736bf6d9e5f98d01e923e950503f5 Parents: c31c5ea 691ee6f Author: Timothy Farkas <[email protected]> Authored: Tue Dec 15 12:45:13 2015 -0800 Committer: brightchen <[email protected]> Committed: Tue Dec 15 16:47:53 2015 -0800 ---------------------------------------------------------------------- .../dimensions/CustomTimeBucketRegistry.java | 138 ++++++ .../dimensions/DimensionsConversionContext.java | 114 +++++ .../lib/dimensions/DimensionsDescriptor.java | 420 +++++++++++++++++++ .../dimensions/aggregator/AggregateEvent.java | 35 ++ .../CustomTimeBucketRegistryTest.java | 87 ++++ .../dimensions/DimensionsDescriptorTest.java | 101 +++++ 6 files changed, 895 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java ---------------------------------------------------------------------- diff --cc library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java index 0000000,0000000..fc11647 new file mode 100644 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java @@@ -1,0 -1,0 +1,138 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package com.datatorrent.lib.dimensions; ++ ++import java.io.Serializable; ++import java.util.HashMap; ++import java.util.Map; ++ ++import com.google.common.base.Preconditions; ++ ++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; ++ ++import it.unimi.dsi.fastutil.ints.Int2ObjectMap; ++import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; ++import it.unimi.dsi.fastutil.objects.Object2IntMap; ++import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; ++ ++public class CustomTimeBucketRegistry implements Serializable ++{ ++ private static final long serialVersionUID = 201509221536L; ++ ++ private int currentId; ++ ++ private Int2ObjectMap<CustomTimeBucket> idToTimeBucket = new Int2ObjectOpenHashMap<>(); ++ private Object2IntMap<CustomTimeBucket> timeBucketToId = new Object2IntOpenHashMap<>(); ++ private Map<String, CustomTimeBucket> textToTimeBucket = new HashMap<>(); ++ ++ public CustomTimeBucketRegistry() ++ { ++ } ++ ++ public CustomTimeBucketRegistry(int startingId) ++ { ++ this.currentId = startingId; ++ } ++ ++ public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket) ++ { ++ initialize(idToTimeBucket); ++ } ++ ++ public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket, ++ int startingId) ++ { ++ int tempId = initialize(idToTimeBucket); ++ ++ Preconditions.checkArgument(tempId < startingId, "The statingId " + startingId ++ + " must be larger than the largest ID " + tempId ++ + " in the given idToTimeBucket mapping"); ++ ++ this.idToTimeBucket = Preconditions.checkNotNull(idToTimeBucket); ++ this.currentId = startingId; ++ } ++ ++ private int initialize(Int2ObjectMap<CustomTimeBucket> idToTimeBucket) ++ { ++ Preconditions.checkNotNull(idToTimeBucket); ++ ++ int tempId = Integer.MIN_VALUE; ++ ++ for (int timeBucketId : idToTimeBucket.keySet()) { ++ tempId = Math.max(tempId, timeBucketId); ++ CustomTimeBucket customTimeBucket = idToTimeBucket.get(timeBucketId); ++ textToTimeBucket.put(customTimeBucket.getText(), customTimeBucket); ++ Preconditions.checkNotNull(customTimeBucket); ++ timeBucketToId.put(customTimeBucket, timeBucketId); ++ } ++ ++ return tempId; ++ } ++ ++ public CustomTimeBucket getTimeBucket(int timeBucketId) ++ { ++ return idToTimeBucket.get(timeBucketId); ++ } ++ ++ public Integer getTimeBucketId(CustomTimeBucket timeBucket) ++ { ++ if (!timeBucketToId.containsKey(timeBucket)) { ++ return null; ++ } ++ ++ return timeBucketToId.get(timeBucket); ++ } ++ ++ public CustomTimeBucket getTimeBucket(String text) ++ { ++ return textToTimeBucket.get(text); ++ } ++ ++ public void register(CustomTimeBucket timeBucket) ++ { ++ register(timeBucket, currentId); ++ } ++ ++ public void register(CustomTimeBucket timeBucket, int timeBucketId) ++ { ++ if (timeBucketToId.containsKey(timeBucket)) { ++ throw new IllegalArgumentException("The timeBucket " + timeBucket + " is already registered."); ++ } ++ ++ if (timeBucketToId.containsValue(timeBucketId)) { ++ throw new IllegalArgumentException("The timeBucketId " + timeBucketId + " is already registered."); ++ } ++ ++ idToTimeBucket.put(timeBucketId, timeBucket); ++ timeBucketToId.put(timeBucket, timeBucketId); ++ ++ if (timeBucketId >= currentId) { ++ currentId = timeBucketId + 1; ++ } ++ ++ textToTimeBucket.put(timeBucket.getText(), timeBucket); ++ } ++ ++ @Override ++ public String toString() ++ { ++ return "CustomTimeBucketRegistry{" + "idToTimeBucket=" + idToTimeBucket + '}'; ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java ---------------------------------------------------------------------- diff --cc library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java index 0000000,0000000..9247320 new file mode 100644 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java @@@ -1,0 -1,0 +1,114 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package com.datatorrent.lib.dimensions; ++ ++import java.io.Serializable; ++ ++import com.datatorrent.lib.appdata.gpo.GPOUtils.IndexSubset; ++import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; ++ ++/** ++ * This is a context object used to convert {@link InputEvent}s into aggregates ++ * in {@link IncrementalAggregator}s. ++ */ ++public class DimensionsConversionContext implements Serializable ++{ ++ private static final long serialVersionUID = 201506151157L; ++ ++ public CustomTimeBucketRegistry customTimeBucketRegistry; ++ /** ++ * The schema ID for {@link Aggregate}s emitted by the ++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s ++ * holding this context. ++ */ ++ public int schemaID; ++ /** ++ * The dimensionsDescriptor ID for {@link Aggregate}s emitted by the ++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s ++ * holding this context. ++ */ ++ public int dimensionsDescriptorID; ++ /** ++ * The aggregator ID for {@link Aggregate}s emitted by the ++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s ++ * holding this context. ++ */ ++ public int aggregatorID; ++ /** ++ * The {@link DimensionsDescriptor} corresponding to the given dimension ++ * descriptor id. ++ */ ++ public DimensionsDescriptor dd; ++ /** ++ * The {@link FieldsDescriptor} for the aggregate of the {@link Aggregate}s ++ * emitted by the ++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s ++ * holding this context object. ++ */ ++ public FieldsDescriptor aggregateDescriptor; ++ /** ++ * The {@link FieldsDescriptor} for the key of the {@link Aggregate}s emitted ++ * by the ++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s ++ * holding this context object. ++ */ ++ public FieldsDescriptor keyDescriptor; ++ /** ++ * The index of the timestamp field within the key of {@link InputEvent}s ++ * received by the ++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s ++ * holding this context object. This is -1 if the {@link InputEvent} key has ++ * no timestamp. ++ */ ++ public int inputTimestampIndex; ++ /** ++ * The index of the timestamp field within the key of {@link Aggregate}s ++ * emitted by the ++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s ++ * holding this context object. This is -1 if the {@link Aggregate}'s key has ++ * no timestamp. ++ */ ++ public int outputTimestampIndex; ++ /** ++ * The index of the time bucket field within the key of {@link Aggregate}s ++ * emitted by the ++ * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s ++ * holding this context object. This is -1 if the {@link Aggregate}'s key has ++ * no timebucket. ++ */ ++ public int outputTimebucketIndex; ++ /** ++ * The {@link IndexSubset} object that is used to extract key values from ++ * {@link InputEvent}s received by this aggregator. ++ */ ++ public IndexSubset indexSubsetKeys; ++ /** ++ * The {@link IndexSubset} object that is used to extract aggregate values ++ * from {@link InputEvent}s received by this aggregator. ++ */ ++ public IndexSubset indexSubsetAggregates; ++ ++ /** ++ * Constructor for creating conversion context. ++ */ ++ public DimensionsConversionContext() ++ { ++ //Do nothing. ++ } ++} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java ---------------------------------------------------------------------- diff --cc library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java index 0000000,0000000..e593112 new file mode 100644 --- /dev/null +++ b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java @@@ -1,0 -1,0 +1,420 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package com.datatorrent.lib.dimensions; ++ ++import java.io.Serializable; ++import java.util.Collections; ++import java.util.List; ++import java.util.Map; ++import java.util.Set; ++import java.util.concurrent.TimeUnit; ++ ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import com.google.common.base.Preconditions; ++import com.google.common.collect.ImmutableSet; ++import com.google.common.collect.Maps; ++import com.google.common.collect.Sets; ++ ++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; ++import com.datatorrent.lib.appdata.schemas.Fields; ++import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; ++import com.datatorrent.lib.appdata.schemas.TimeBucket; ++import com.datatorrent.lib.appdata.schemas.Type; ++ ++/** ++ * <p> ++ * This class defines a dimensions combination which is used by dimensions computation operators ++ * and stores. A dimension combination is composed of the names of the fields that constitute the key, ++ * as well as the TimeBucket under which data is stored. ++ * </p> ++ * <p> ++ * This class supports the creation of a dimensions combination from a {@link TimeBucket} object and a set of fields. ++ * It also supports the creation of a dimensions combination an aggregation string. An aggregation string looks like ++ * the following: ++ * <br/> ++ * <br/> ++ * {@code ++ * "time=MINUTES:publisher:advertiser" ++ * } ++ * <br/> ++ * <br/> ++ * In the example above <b>"time=MINUTES"</b> represents a time bucket, and the other colon separated strings represent ++ * the name of fields which comprise the key for this dimension combination. When specifiying a time bucket in an ++ * aggregation string you must use the name of one of the TimeUnit enums. ++ * </p> ++ * <p> ++ * One of the primary uses of a {@link DimensionsDescriptor} is for querying a dimensional data store. When a query is ++ * received for a dimensional data store, the query must be mapped to many things including a dimensionDescriptorID. The ++ * dimensionDescriptorID is an id assigned to a class of dimension combinations which share the same keys. This ++ * mapping is ++ * performed by creating a ++ * {@link DimensionsDescriptor} object from the query, and then using the {@link DimensionsDescriptor} object ++ * to look up the correct dimensionsDescriptorID. This lookup to retrieve a dimensionsDescriptorID is necessary ++ * because a ++ * dimensionsDescriptorID is used for storage in order to prevent key conflicts. ++ * </p> ++ * ++ */ ++public class DimensionsDescriptor implements Serializable, Comparable<DimensionsDescriptor> ++{ ++ private static final long serialVersionUID = 201506251237L; ++ ++ /** ++ * Name of the reserved time field. ++ */ ++ public static final String DIMENSION_TIME = "time"; ++ /** ++ * Type of the reserved time field. ++ */ ++ public static final Type DIMENSION_TIME_TYPE = Type.LONG; ++ /** ++ * Name of the reserved time bucket field. ++ */ ++ public static final String DIMENSION_TIME_BUCKET = "timeBucket"; ++ /** ++ * Type of the reserved time bucket field. ++ */ ++ public static final Type DIMENSION_TIME_BUCKET_TYPE = Type.INTEGER; ++ /** ++ * The set of fields used for time, which are intended to be queried. Not that the ++ * timeBucket field is not included here because its not intended to be queried. ++ */ ++ public static final Fields TIME_FIELDS = new Fields(Sets.newHashSet(DIMENSION_TIME)); ++ /** ++ * This set represents the field names which cannot be part of the user defined field names in a schema for ++ * dimensions computation. ++ */ ++ public static final Set<String> RESERVED_DIMENSION_NAMES = ImmutableSet.of(DIMENSION_TIME, ++ DIMENSION_TIME_BUCKET); ++ /** ++ * This is the equals string separator used when defining a time bucket for a dimensions combination. ++ */ ++ public static final String DELIMETER_EQUALS = "="; ++ /** ++ * This separates dimensions in the dimensions combination. ++ */ ++ public static final String DELIMETER_SEPERATOR = ":"; ++ /** ++ * A map from a key field to its type. ++ */ ++ public static final Map<String, Type> DIMENSION_FIELD_TO_TYPE; ++ ++ /** ++ * The time bucket used for this dimension combination. ++ */ ++ private TimeBucket timeBucket; ++ /** ++ * The custom time bucket used for this dimension combination. ++ */ ++ private CustomTimeBucket customTimeBucket; ++ /** ++ * The set of key fields which compose this dimension combination. ++ */ ++ private Fields fields; ++ ++ static { ++ Map<String, Type> dimensionFieldToType = Maps.newHashMap(); ++ ++ dimensionFieldToType.put(DIMENSION_TIME, DIMENSION_TIME_TYPE); ++ dimensionFieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE); ++ ++ DIMENSION_FIELD_TO_TYPE = Collections.unmodifiableMap(dimensionFieldToType); ++ } ++ ++ /** ++ * Constructor for kryo serialization. ++ */ ++ private DimensionsDescriptor() ++ { ++ //for kryo ++ } ++ ++ /** ++ * Creates a dimensions descriptor (dimensions combination) with the given {@link TimeBucket} and key fields. ++ * ++ * @param timeBucket The {@link TimeBucket} that this dimensions combination represents. ++ * @param fields The key fields included in this dimensions combination. ++ * @deprecated use ++ * {@link #DimensionsDescriptor(com.datatorrent.lib.appdata.schemas.CustomTimeBucket, ++ * com.datatorrent.lib.appdata.schemas.Fields)} instead. ++ */ ++ @Deprecated ++ public DimensionsDescriptor(TimeBucket timeBucket, ++ Fields fields) ++ { ++ setTimeBucket(timeBucket); ++ setFields(fields); ++ } ++ ++ /** ++ * Creates a dimensions descriptor (dimensions combination) with the given {@link CustomTimeBucket} and key fields. ++ * ++ * @param timeBucket The {@link CustomTimeBucket} that this dimensions combination represents. ++ * @param fields The key fields included in this dimensions combination. ++ */ ++ public DimensionsDescriptor(CustomTimeBucket timeBucket, ++ Fields fields) ++ { ++ setCustomTimeBucket(timeBucket); ++ setFields(fields); ++ } ++ ++ /** ++ * Creates a dimensions descriptor (dimensions combination) with the given key fields. ++ * ++ * @param fields The key fields included in this dimensions combination. ++ */ ++ public DimensionsDescriptor(Fields fields) ++ { ++ setFields(fields); ++ } ++ ++ /** ++ * This construction creates a dimensions descriptor (dimensions combination) from the given aggregation string. ++ * ++ * @param aggregationString The aggregation string to use when initializing this dimensions combination. ++ */ ++ public DimensionsDescriptor(String aggregationString) ++ { ++ initialize(aggregationString); ++ } ++ ++ /** ++ * Initializes the dimensions combination with the given aggregation string. ++ * ++ * @param aggregationString The aggregation string with which to initialize this dimensions combination. ++ */ ++ private void initialize(String aggregationString) ++ { ++ String[] fieldArray = aggregationString.split(DELIMETER_SEPERATOR); ++ Set<String> fieldSet = Sets.newHashSet(); ++ ++ for (String field : fieldArray) { ++ String[] fieldAndValue = field.split(DELIMETER_EQUALS); ++ String fieldName = fieldAndValue[0]; ++ ++ if (fieldName.equals(DIMENSION_TIME_BUCKET)) { ++ throw new IllegalArgumentException(DIMENSION_TIME_BUCKET + " is an invalid time."); ++ } ++ ++ if (!fieldName.equals(DIMENSION_TIME)) { ++ fieldSet.add(fieldName); ++ } ++ ++ if (fieldName.equals(DIMENSION_TIME)) { ++ if (timeBucket != null) { ++ throw new IllegalArgumentException("Cannot specify time in a dimensions " ++ + "descriptor when a timebucket is also " ++ + "specified."); ++ } ++ ++ if (fieldAndValue.length == 2) { ++ ++ timeBucket = TimeBucket.TIME_UNIT_TO_TIME_BUCKET.get(TimeUnit.valueOf(fieldAndValue[1])); ++ } ++ } ++ } ++ ++ fields = new Fields(fieldSet); ++ } ++ ++ /** ++ * This is a helper method which sets and validates the {@link TimeBucket}. ++ * ++ * @param timeBucket The {@link TimeBucket} to set and validate. ++ */ ++ private void setTimeBucket(TimeBucket timeBucket) ++ { ++ Preconditions.checkNotNull(timeBucket); ++ this.timeBucket = timeBucket; ++ this.customTimeBucket = new CustomTimeBucket(timeBucket); ++ } ++ ++ /** ++ * This is a helper method which sets and validates the {@link CustomTimeBucket}. ++ * ++ * @param customTimeBucket The {@link CustomTimeBucket} to set and validate. ++ */ ++ private void setCustomTimeBucket(CustomTimeBucket customTimeBucket) ++ { ++ Preconditions.checkNotNull(customTimeBucket); ++ this.customTimeBucket = customTimeBucket; ++ this.timeBucket = customTimeBucket.getTimeBucket(); ++ } ++ ++ /** ++ * Gets the {@link TimeBucket} for this {@link DimensionsDescriptor} object. ++ * ++ * @return The {@link TimeBucket} for this {@link DimensionsDescriptor} object. ++ * @deprecated use {@link #getCustomTimeBucket()} instead. ++ */ ++ @Deprecated ++ public TimeBucket getTimeBucket() ++ { ++ return timeBucket; ++ } ++ ++ /** ++ * Gets the {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object. ++ * ++ * @return The {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object. ++ */ ++ public CustomTimeBucket getCustomTimeBucket() ++ { ++ return customTimeBucket; ++ } ++ ++ /** ++ * This is a helper method which sets and validates the set of key fields for this ++ * {@link DimensionsDescriptor} object. ++ * ++ * @param fields The set of key fields for this {@link DimensionsDescriptor} object. ++ */ ++ private void setFields(Fields fields) ++ { ++ Preconditions.checkNotNull(fields); ++ this.fields = fields; ++ } ++ ++ /** ++ * Returns the set of key fields for this {@link DimensionsDescriptor} object. ++ * ++ * @return The set of key fields for this {@link DimensionsDescriptor} object. ++ */ ++ public Fields getFields() ++ { ++ return fields; ++ } ++ ++ /** ++ * This method is used to create a new {@link FieldsDescriptor} object representing this ++ * {@link DimensionsDescriptor} object from another {@link FieldsDescriptor} object which ++ * defines the names and types of all the available key fields. ++ * ++ * @param parentDescriptor The {@link FieldsDescriptor} object which defines the name and ++ * type of all the available key fields. ++ * @return A {@link FieldsDescriptor} object which represents this {@link DimensionsDescriptor} (dimensions ++ * combination) ++ * derived from the given {@link FieldsDescriptor} object. ++ */ ++ public FieldsDescriptor createFieldsDescriptor(FieldsDescriptor parentDescriptor) ++ { ++ Map<String, Type> fieldToType = Maps.newHashMap(); ++ Map<String, Type> parentFieldToType = parentDescriptor.getFieldToType(); ++ ++ for (String field : this.fields.getFields()) { ++ if (RESERVED_DIMENSION_NAMES.contains(field)) { ++ continue; ++ } ++ ++ fieldToType.put(field, parentFieldToType.get(field)); ++ } ++ ++ if (timeBucket != null && timeBucket != TimeBucket.ALL) { ++ fieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE); ++ fieldToType.put(DIMENSION_TIME, Type.LONG); ++ } ++ ++ return new FieldsDescriptor(fieldToType); ++ } ++ ++ @Override ++ public int hashCode() ++ { ++ int hash = 7; ++ hash = 83 * hash + (this.customTimeBucket != null ? this.customTimeBucket.hashCode() : 0); ++ hash = 83 * hash + (this.fields != null ? this.fields.hashCode() : 0); ++ return hash; ++ } ++ ++ @Override ++ public boolean equals(Object obj) ++ { ++ if (obj == null) { ++ return false; ++ } ++ if (getClass() != obj.getClass()) { ++ return false; ++ } ++ final DimensionsDescriptor other = (DimensionsDescriptor)obj; ++ if (!this.customTimeBucket.equals(other.customTimeBucket)) { ++ return false; ++ } ++ if (this.fields != other.fields && (this.fields == null || !this.fields.equals(other.fields))) { ++ return false; ++ } ++ return true; ++ } ++ ++ @Override ++ public String toString() ++ { ++ return "DimensionsDescriptor{" + "timeBucket=" + customTimeBucket + ", fields=" + fields + '}'; ++ } ++ ++ @Override ++ public int compareTo(DimensionsDescriptor other) ++ { ++ if (this == other) { ++ return 0; ++ } ++ ++ List<String> thisFieldList = this.getFields().getFieldsList(); ++ List<String> otherFieldList = other.getFields().getFieldsList(); ++ ++ if (thisFieldList != otherFieldList) { ++ int compare = thisFieldList.size() - otherFieldList.size(); ++ ++ if (compare != 0) { ++ return compare; ++ } ++ ++ Collections.sort(thisFieldList); ++ Collections.sort(otherFieldList); ++ ++ for (int index = 0; index < thisFieldList.size(); index++) { ++ String thisField = thisFieldList.get(index); ++ String otherField = otherFieldList.get(index); ++ ++ int fieldCompare = thisField.compareTo(otherField); ++ ++ if (fieldCompare != 0) { ++ return fieldCompare; ++ } ++ } ++ } ++ ++ CustomTimeBucket thisBucket = this.getCustomTimeBucket(); ++ CustomTimeBucket otherBucket = other.getCustomTimeBucket(); ++ ++ if (thisBucket == null && otherBucket == null) { ++ return 0; ++ } else if (thisBucket != null && otherBucket == null) { ++ return 1; ++ } else if (thisBucket == null && otherBucket != null) { ++ return -1; ++ } else { ++ return thisBucket.compareTo(otherBucket); ++ } ++ } ++ ++ private static final Logger LOG = LoggerFactory.getLogger(DimensionsDescriptor.class); ++} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java ---------------------------------------------------------------------- diff --cc library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java index 0000000,0000000..5c4feed new file mode 100644 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistryTest.java @@@ -1,0 -1,0 +1,87 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package com.datatorrent.lib.dimensions; ++ ++import org.junit.Assert; ++import org.junit.Test; ++ ++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; ++import com.datatorrent.lib.appdata.schemas.TimeBucket; ++import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry; ++ ++ ++public class CustomTimeBucketRegistryTest ++{ ++ @Test ++ public void testBuildingRegistry() ++ { ++ CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry(); ++ ++ CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE); ++ CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR); ++ CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY); ++ ++ timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal()); ++ timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal()); ++ timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal()); ++ ++ CustomTimeBucket customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.MINUTE.ordinal()); ++ Assert.assertTrue(customTimeBucket.isUnit()); ++ Assert.assertEquals(TimeBucket.MINUTE, customTimeBucket.getTimeBucket()); ++ ++ customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.HOUR.ordinal()); ++ Assert.assertTrue(customTimeBucket.isUnit()); ++ Assert.assertEquals(TimeBucket.HOUR, customTimeBucket.getTimeBucket()); ++ ++ customTimeBucket = timeBucketRegistry.getTimeBucket(TimeBucket.DAY.ordinal()); ++ Assert.assertTrue(customTimeBucket.isUnit()); ++ Assert.assertEquals(TimeBucket.DAY, customTimeBucket.getTimeBucket()); ++ ++ Assert.assertEquals(TimeBucket.MINUTE.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1m)); ++ Assert.assertEquals(TimeBucket.HOUR.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1h)); ++ Assert.assertEquals(TimeBucket.DAY.ordinal(), (int)timeBucketRegistry.getTimeBucketId(c1d)); ++ } ++ ++ @Test ++ public void testRegister() ++ { ++ CustomTimeBucketRegistry timeBucketRegistry = new CustomTimeBucketRegistry(); ++ ++ CustomTimeBucket c1m = new CustomTimeBucket(TimeBucket.MINUTE); ++ CustomTimeBucket c1h = new CustomTimeBucket(TimeBucket.HOUR); ++ CustomTimeBucket c1d = new CustomTimeBucket(TimeBucket.DAY); ++ ++ timeBucketRegistry.register(c1m, TimeBucket.MINUTE.ordinal()); ++ timeBucketRegistry.register(c1h, TimeBucket.HOUR.ordinal()); ++ timeBucketRegistry.register(c1d, TimeBucket.DAY.ordinal()); ++ ++ int max = Integer.MIN_VALUE; ++ max = Math.max(max, TimeBucket.MINUTE.ordinal()); ++ max = Math.max(max, TimeBucket.HOUR.ordinal()); ++ max = Math.max(max, TimeBucket.DAY.ordinal()); ++ ++ CustomTimeBucket c5m = new CustomTimeBucket(TimeBucket.MINUTE, 5L); ++ ++ timeBucketRegistry.register(c5m); ++ int timeBucketId = timeBucketRegistry.getTimeBucketId(c5m); ++ ++ Assert.assertEquals(max + 1, timeBucketId); ++ } ++ ++} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a4718f02/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java ---------------------------------------------------------------------- diff --cc library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java index 0000000,0000000..54682b1 new file mode 100644 --- /dev/null +++ b/library/src/test/java/com/datatorrent/lib/dimensions/DimensionsDescriptorTest.java @@@ -1,0 -1,0 +1,101 @@@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package com.datatorrent.lib.dimensions; ++ ++import java.util.Set; ++import java.util.concurrent.TimeUnit; ++ ++import com.google.common.collect.Sets; ++ ++import org.junit.Assert; ++import org.junit.Test; ++ ++import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; ++import com.datatorrent.lib.appdata.schemas.Fields; ++import com.datatorrent.lib.appdata.schemas.TimeBucket; ++import com.datatorrent.lib.appdata.schemas.Type; ++import com.datatorrent.lib.dimensions.DimensionsDescriptor; ++ ++public class DimensionsDescriptorTest ++{ ++ public static final String KEY_1_NAME = "key1"; ++ public static final Type KEY_1_TYPE = Type.INTEGER; ++ public static final String KEY_2_NAME = "key2"; ++ public static final Type KEY_2_TYPE = Type.STRING; ++ ++ public static final String AGG_1_NAME = "agg1"; ++ public static final Type AGG_1_TYPE = Type.INTEGER; ++ public static final String AGG_2_NAME = "agg2"; ++ public static final Type AGG_2_TYPE = Type.STRING; ++ ++ @Test ++ public void simpleTest1() ++ { ++ DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME); ++ ++ Set<String> fields = Sets.newHashSet(); ++ fields.add(KEY_1_NAME); ++ ++ Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields()); ++ Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket()); ++ } ++ ++ @Test ++ public void simpleTest2() ++ { ++ DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME + ++ DimensionsDescriptor.DELIMETER_SEPERATOR + ++ KEY_2_NAME); ++ ++ Set<String> fields = Sets.newHashSet(); ++ fields.add(KEY_1_NAME); ++ fields.add(KEY_2_NAME); ++ ++ Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields()); ++ Assert.assertEquals("The timeunit should be null.", null, ad.getTimeBucket()); ++ } ++ ++ @Test ++ public void simpleTimeTest() ++ { ++ DimensionsDescriptor ad = new DimensionsDescriptor(KEY_1_NAME + ++ DimensionsDescriptor.DELIMETER_SEPERATOR + ++ DimensionsDescriptor.DIMENSION_TIME + ++ DimensionsDescriptor.DELIMETER_EQUALS + ++ "DAYS"); ++ ++ Set<String> fields = Sets.newHashSet(); ++ fields.add(KEY_1_NAME); ++ ++ Assert.assertEquals("The fields should match.", fields, ad.getFields().getFields()); ++ Assert.assertEquals("The timeunit should be DAYS.", TimeUnit.DAYS, ad.getTimeBucket().getTimeUnit()); ++ } ++ ++ @Test ++ public void equalsAndHashCodeTest() ++ { ++ DimensionsDescriptor ddA = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L), ++ new Fields(Sets.newHashSet("a", "b"))); ++ ++ DimensionsDescriptor ddB = new DimensionsDescriptor(new CustomTimeBucket(TimeBucket.MINUTE, 5L), ++ new Fields(Sets.newHashSet("a", "b"))); ++ ++ Assert.assertTrue(ddB.equals(ddA)); ++ } ++}
