APEXMALHAR-1991 #resolve #comment Move Dimensions Computation Classes to org.apache.apex.malhar package
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c5cab8bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c5cab8bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c5cab8bd Branch: refs/heads/master Commit: c5cab8bd53913d76cbd0a6c37d56a9cca6968c89 Parents: 79eeff7 Author: Timothy Farkas <[email protected]> Authored: Mon Mar 28 15:37:11 2016 -0700 Committer: Timothy Farkas <[email protected]> Committed: Mon Mar 28 15:37:11 2016 -0700 ---------------------------------------------------------------------- .../schemas/DimensionalConfigurationSchema.java | 13 +- .../lib/appdata/schemas/DimensionalSchema.java | 5 +- .../db/jdbc/JDBCDimensionalOutputOperator.java | 8 +- .../dimensions/CustomTimeBucketRegistry.java | 141 --- .../dimensions/DimensionsConversionContext.java | 116 --- .../lib/dimensions/DimensionsDescriptor.java | 422 --------- .../lib/dimensions/DimensionsEvent.java | 844 ------------------ .../AbstractIncrementalAggregator.java | 190 ----- .../dimensions/aggregator/AggregateEvent.java | 38 - .../aggregator/AggregatorAverage.java | 146 ---- .../dimensions/aggregator/AggregatorCount.java | 128 --- .../dimensions/aggregator/AggregatorCumSum.java | 233 ----- .../dimensions/aggregator/AggregatorFirst.java | 84 -- .../aggregator/AggregatorIncrementalType.java | 79 -- .../dimensions/aggregator/AggregatorLast.java | 84 -- .../dimensions/aggregator/AggregatorMax.java | 265 ------ .../dimensions/aggregator/AggregatorMin.java | 265 ------ .../aggregator/AggregatorOTFType.java | 89 -- .../aggregator/AggregatorRegistry.java | 424 ---------- .../dimensions/aggregator/AggregatorSum.java | 254 ------ .../dimensions/aggregator/AggregatorUtils.java | 148 ---- .../aggregator/IncrementalAggregator.java | 70 -- .../dimensions/aggregator/OTFAggregator.java | 84 -- .../lib/dimensions/package-info.java | 20 - .../dimensions/CustomTimeBucketRegistry.java | 139 +++ .../dimensions/DimensionsConversionContext.java | 116 +++ .../lib/dimensions/DimensionsDescriptor.java | 447 ++++++++++ .../malhar/lib/dimensions/DimensionsEvent.java | 848 +++++++++++++++++++ .../AbstractIncrementalAggregator.java | 191 +++++ .../dimensions/aggregator/AggregateEvent.java | 38 + .../aggregator/AggregatorAverage.java | 146 ++++ .../dimensions/aggregator/AggregatorCount.java | 129 +++ .../dimensions/aggregator/AggregatorCumSum.java | 234 +++++ .../dimensions/aggregator/AggregatorFirst.java | 85 ++ .../aggregator/AggregatorIncrementalType.java | 79 ++ .../dimensions/aggregator/AggregatorLast.java | 85 ++ .../dimensions/aggregator/AggregatorMax.java | 266 ++++++ .../dimensions/aggregator/AggregatorMin.java | 266 ++++++ .../aggregator/AggregatorOTFType.java | 89 ++ .../aggregator/AggregatorRegistry.java | 424 ++++++++++ .../dimensions/aggregator/AggregatorSum.java | 255 ++++++ .../dimensions/aggregator/AggregatorUtils.java | 148 ++++ .../aggregator/IncrementalAggregator.java | 71 ++ .../dimensions/aggregator/OTFAggregator.java | 84 ++ .../malhar/lib/dimensions/package-info.java | 20 + .../CustomTimeBucketRegistryTest.java | 4 +- .../appdata/dimensions/DimensionsEventTest.java | 7 +- .../DimensionalConfigurationSchemaTest.java | 23 +- .../appdata/schemas/DimensionalSchemaTest.java | 15 +- .../CustomTimeBucketRegistryTest.java | 87 -- .../dimensions/DimensionsDescriptorTest.java | 101 --- .../CustomTimeBucketRegistryTest.java | 87 ++ .../dimensions/DimensionsDescriptorTest.java | 102 +++ 53 files changed, 4387 insertions(+), 4349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java index 28fa119..1e048c4 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalConfigurationSchema.java @@ -26,6 +26,12 @@ import java.util.Set; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.dimensions.CustomTimeBucketRegistry; +import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor; +import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry; +import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorUtils; +import org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator; +import org.apache.apex.malhar.lib.dimensions.aggregator.OTFAggregator; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -39,13 +45,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.datatorrent.lib.dimensions.CustomTimeBucketRegistry; -import com.datatorrent.lib.dimensions.DimensionsDescriptor; -import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry; -import com.datatorrent.lib.dimensions.aggregator.AggregatorUtils; -import com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator; -import com.datatorrent.lib.dimensions.aggregator.OTFAggregator; - import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; import it.unimi.dsi.fastutil.ints.IntArrayList; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java index 6639d3a..30f2c1e 100644 --- a/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java +++ b/library/src/main/java/com/datatorrent/lib/appdata/schemas/DimensionalSchema.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry; +import org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONException; import org.codehaus.jettison.json.JSONObject; @@ -35,9 +37,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry; -import com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator; - /** * The {@link DimensionalSchema} class represents the App Data dimensions schema. The App Data dimensions * schema is built from two sources: a {@link DimensionalConfigurationSchema} and an optional schema stub. The http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java index 3021521..353f1b2 100644 --- a/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JDBCDimensionalOutputOperator.java @@ -28,6 +28,10 @@ import java.util.Map; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.dimensions.DimensionsDescriptor; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.Aggregate; +import org.apache.apex.malhar.lib.dimensions.DimensionsEvent.EventKey; +import org.apache.apex.malhar.lib.dimensions.aggregator.AggregatorRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,10 +46,6 @@ import com.datatorrent.lib.appdata.schemas.DimensionalConfigurationSchema; import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; import com.datatorrent.lib.appdata.schemas.Type; import com.datatorrent.lib.db.AbstractPassThruTransactionableStoreOutputOperator; -import com.datatorrent.lib.dimensions.DimensionsDescriptor; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey; -import com.datatorrent.lib.dimensions.aggregator.AggregatorRegistry; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java b/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java deleted file mode 100644 index 0e76509..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/CustomTimeBucketRegistry.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions; - -import java.io.Serializable; -import java.util.HashMap; -import java.util.Map; - -import com.google.common.base.Preconditions; - -import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; - -import it.unimi.dsi.fastutil.ints.Int2ObjectMap; -import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import it.unimi.dsi.fastutil.objects.Object2IntMap; -import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; - -/** - * @since 3.3.0 - */ -public class CustomTimeBucketRegistry implements Serializable -{ - private static final long serialVersionUID = 201509221536L; - - private int currentId; - - private Int2ObjectMap<CustomTimeBucket> idToTimeBucket = new Int2ObjectOpenHashMap<>(); - private Object2IntMap<CustomTimeBucket> timeBucketToId = new Object2IntOpenHashMap<>(); - private Map<String, CustomTimeBucket> textToTimeBucket = new HashMap<>(); - - public CustomTimeBucketRegistry() - { - } - - public CustomTimeBucketRegistry(int startingId) - { - this.currentId = startingId; - } - - public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket) - { - initialize(idToTimeBucket); - } - - public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket, - int startingId) - { - int tempId = initialize(idToTimeBucket); - - Preconditions.checkArgument(tempId < startingId, "The statingId " + startingId - + " must be larger than the largest ID " + tempId - + " in the given idToTimeBucket mapping"); - - this.idToTimeBucket = Preconditions.checkNotNull(idToTimeBucket); - this.currentId = startingId; - } - - private int initialize(Int2ObjectMap<CustomTimeBucket> idToTimeBucket) - { - Preconditions.checkNotNull(idToTimeBucket); - - int tempId = Integer.MIN_VALUE; - - for (int timeBucketId : idToTimeBucket.keySet()) { - tempId = Math.max(tempId, timeBucketId); - CustomTimeBucket customTimeBucket = idToTimeBucket.get(timeBucketId); - textToTimeBucket.put(customTimeBucket.getText(), customTimeBucket); - Preconditions.checkNotNull(customTimeBucket); - timeBucketToId.put(customTimeBucket, timeBucketId); - } - - return tempId; - } - - public CustomTimeBucket getTimeBucket(int timeBucketId) - { - return idToTimeBucket.get(timeBucketId); - } - - public Integer getTimeBucketId(CustomTimeBucket timeBucket) - { - if (!timeBucketToId.containsKey(timeBucket)) { - return null; - } - - return timeBucketToId.get(timeBucket); - } - - public CustomTimeBucket getTimeBucket(String text) - { - return textToTimeBucket.get(text); - } - - public void register(CustomTimeBucket timeBucket) - { - register(timeBucket, currentId); - } - - public void register(CustomTimeBucket timeBucket, int timeBucketId) - { - if (timeBucketToId.containsKey(timeBucket)) { - throw new IllegalArgumentException("The timeBucket " + timeBucket + " is already registered."); - } - - if (timeBucketToId.containsValue(timeBucketId)) { - throw new IllegalArgumentException("The timeBucketId " + timeBucketId + " is already registered."); - } - - idToTimeBucket.put(timeBucketId, timeBucket); - timeBucketToId.put(timeBucket, timeBucketId); - - if (timeBucketId >= currentId) { - currentId = timeBucketId + 1; - } - - textToTimeBucket.put(timeBucket.getText(), timeBucket); - } - - @Override - public String toString() - { - return "CustomTimeBucketRegistry{" + "idToTimeBucket=" + idToTimeBucket + '}'; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java deleted file mode 100644 index dd598ff..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsConversionContext.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions; - -import java.io.Serializable; - -import com.datatorrent.lib.appdata.gpo.GPOUtils.IndexSubset; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; - -/** - * This is a context object used to convert {@link InputEvent}s into aggregates - * in {@link IncrementalAggregator}s. - * - * @since 3.3.0 - */ -public class DimensionsConversionContext implements Serializable -{ - private static final long serialVersionUID = 201506151157L; - - public CustomTimeBucketRegistry customTimeBucketRegistry; - /** - * The schema ID for {@link Aggregate}s emitted by the - * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s - * holding this context. - */ - public int schemaID; - /** - * The dimensionsDescriptor ID for {@link Aggregate}s emitted by the - * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s - * holding this context. - */ - public int dimensionsDescriptorID; - /** - * The aggregator ID for {@link Aggregate}s emitted by the - * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s - * holding this context. - */ - public int aggregatorID; - /** - * The {@link DimensionsDescriptor} corresponding to the given dimension - * descriptor id. - */ - public DimensionsDescriptor dd; - /** - * The {@link FieldsDescriptor} for the aggregate of the {@link Aggregate}s - * emitted by the - * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s - * holding this context object. - */ - public FieldsDescriptor aggregateDescriptor; - /** - * The {@link FieldsDescriptor} for the key of the {@link Aggregate}s emitted - * by the - * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s - * holding this context object. - */ - public FieldsDescriptor keyDescriptor; - /** - * The index of the timestamp field within the key of {@link InputEvent}s - * received by the - * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s - * holding this context object. This is -1 if the {@link InputEvent} key has - * no timestamp. - */ - public int inputTimestampIndex; - /** - * The index of the timestamp field within the key of {@link Aggregate}s - * emitted by the - * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s - * holding this context object. This is -1 if the {@link Aggregate}'s key has - * no timestamp. - */ - public int outputTimestampIndex; - /** - * The index of the time bucket field within the key of {@link Aggregate}s - * emitted by the - * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator}s - * holding this context object. This is -1 if the {@link Aggregate}'s key has - * no timebucket. - */ - public int outputTimebucketIndex; - /** - * The {@link IndexSubset} object that is used to extract key values from - * {@link InputEvent}s received by this aggregator. - */ - public IndexSubset indexSubsetKeys; - /** - * The {@link IndexSubset} object that is used to extract aggregate values - * from {@link InputEvent}s received by this aggregator. - */ - public IndexSubset indexSubsetAggregates; - - /** - * Constructor for creating conversion context. - */ - public DimensionsConversionContext() - { - //Do nothing. - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java deleted file mode 100644 index a1b6f96..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsDescriptor.java +++ /dev/null @@ -1,422 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions; - -import java.io.Serializable; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - -import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; -import com.datatorrent.lib.appdata.schemas.Fields; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.TimeBucket; -import com.datatorrent.lib.appdata.schemas.Type; - -/** - * <p> - * This class defines a dimensions combination which is used by dimensions computation operators - * and stores. A dimension combination is composed of the names of the fields that constitute the key, - * as well as the TimeBucket under which data is stored. - * </p> - * <p> - * This class supports the creation of a dimensions combination from a {@link TimeBucket} object and a set of fields. - * It also supports the creation of a dimensions combination an aggregation string. An aggregation string looks like - * the following: - * <br/> - * <br/> - * {@code - * "time=MINUTES:publisher:advertiser" - * } - * <br/> - * <br/> - * In the example above <b>"time=MINUTES"</b> represents a time bucket, and the other colon separated strings represent - * the name of fields which comprise the key for this dimension combination. When specifiying a time bucket in an - * aggregation string you must use the name of one of the TimeUnit enums. - * </p> - * <p> - * One of the primary uses of a {@link DimensionsDescriptor} is for querying a dimensional data store. When a query is - * received for a dimensional data store, the query must be mapped to many things including a dimensionDescriptorID. The - * dimensionDescriptorID is an id assigned to a class of dimension combinations which share the same keys. This - * mapping is - * performed by creating a - * {@link DimensionsDescriptor} object from the query, and then using the {@link DimensionsDescriptor} object - * to look up the correct dimensionsDescriptorID. This lookup to retrieve a dimensionsDescriptorID is necessary - * because a - * dimensionsDescriptorID is used for storage in order to prevent key conflicts. - * </p> - * - * - * @since 3.3.0 - */ -public class DimensionsDescriptor implements Serializable, Comparable<DimensionsDescriptor> -{ - private static final long serialVersionUID = 201506251237L; - - /** - * Name of the reserved time field. - */ - public static final String DIMENSION_TIME = "time"; - /** - * Type of the reserved time field. - */ - public static final Type DIMENSION_TIME_TYPE = Type.LONG; - /** - * Name of the reserved time bucket field. - */ - public static final String DIMENSION_TIME_BUCKET = "timeBucket"; - /** - * Type of the reserved time bucket field. - */ - public static final Type DIMENSION_TIME_BUCKET_TYPE = Type.INTEGER; - /** - * The set of fields used for time, which are intended to be queried. Not that the - * timeBucket field is not included here because its not intended to be queried. - */ - public static final Fields TIME_FIELDS = new Fields(Sets.newHashSet(DIMENSION_TIME)); - /** - * This set represents the field names which cannot be part of the user defined field names in a schema for - * dimensions computation. - */ - public static final Set<String> RESERVED_DIMENSION_NAMES = ImmutableSet.of(DIMENSION_TIME, - DIMENSION_TIME_BUCKET); - /** - * This is the equals string separator used when defining a time bucket for a dimensions combination. - */ - public static final String DELIMETER_EQUALS = "="; - /** - * This separates dimensions in the dimensions combination. - */ - public static final String DELIMETER_SEPERATOR = ":"; - /** - * A map from a key field to its type. - */ - public static final Map<String, Type> DIMENSION_FIELD_TO_TYPE; - - /** - * The time bucket used for this dimension combination. - */ - private TimeBucket timeBucket; - /** - * The custom time bucket used for this dimension combination. - */ - private CustomTimeBucket customTimeBucket; - /** - * The set of key fields which compose this dimension combination. - */ - private Fields fields; - - static { - Map<String, Type> dimensionFieldToType = Maps.newHashMap(); - - dimensionFieldToType.put(DIMENSION_TIME, DIMENSION_TIME_TYPE); - dimensionFieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE); - - DIMENSION_FIELD_TO_TYPE = Collections.unmodifiableMap(dimensionFieldToType); - } - - /** - * Constructor for kryo serialization. - */ - private DimensionsDescriptor() - { - //for kryo - } - - /** - * Creates a dimensions descriptor (dimensions combination) with the given {@link TimeBucket} and key fields. - * - * @param timeBucket The {@link TimeBucket} that this dimensions combination represents. - * @param fields The key fields included in this dimensions combination. - * @deprecated use - * {@link #DimensionsDescriptor(com.datatorrent.lib.appdata.schemas.CustomTimeBucket, - * com.datatorrent.lib.appdata.schemas.Fields)} instead. - */ - @Deprecated - public DimensionsDescriptor(TimeBucket timeBucket, - Fields fields) - { - setTimeBucket(timeBucket); - setFields(fields); - } - - /** - * Creates a dimensions descriptor (dimensions combination) with the given {@link CustomTimeBucket} and key fields. - * - * @param timeBucket The {@link CustomTimeBucket} that this dimensions combination represents. - * @param fields The key fields included in this dimensions combination. - */ - public DimensionsDescriptor(CustomTimeBucket timeBucket, - Fields fields) - { - setCustomTimeBucket(timeBucket); - setFields(fields); - } - - /** - * Creates a dimensions descriptor (dimensions combination) with the given key fields. - * - * @param fields The key fields included in this dimensions combination. - */ - public DimensionsDescriptor(Fields fields) - { - setFields(fields); - } - - /** - * This construction creates a dimensions descriptor (dimensions combination) from the given aggregation string. - * - * @param aggregationString The aggregation string to use when initializing this dimensions combination. - */ - public DimensionsDescriptor(String aggregationString) - { - initialize(aggregationString); - } - - /** - * Initializes the dimensions combination with the given aggregation string. - * - * @param aggregationString The aggregation string with which to initialize this dimensions combination. - */ - private void initialize(String aggregationString) - { - String[] fieldArray = aggregationString.split(DELIMETER_SEPERATOR); - Set<String> fieldSet = Sets.newHashSet(); - - for (String field : fieldArray) { - String[] fieldAndValue = field.split(DELIMETER_EQUALS); - String fieldName = fieldAndValue[0]; - - if (fieldName.equals(DIMENSION_TIME_BUCKET)) { - throw new IllegalArgumentException(DIMENSION_TIME_BUCKET + " is an invalid time."); - } - - if (!fieldName.equals(DIMENSION_TIME)) { - fieldSet.add(fieldName); - } - - if (fieldName.equals(DIMENSION_TIME)) { - if (timeBucket != null) { - throw new IllegalArgumentException("Cannot specify time in a dimensions " - + "descriptor when a timebucket is also " - + "specified."); - } - - if (fieldAndValue.length == 2) { - - timeBucket = TimeBucket.TIME_UNIT_TO_TIME_BUCKET.get(TimeUnit.valueOf(fieldAndValue[1])); - } - } - } - - fields = new Fields(fieldSet); - } - - /** - * This is a helper method which sets and validates the {@link TimeBucket}. - * - * @param timeBucket The {@link TimeBucket} to set and validate. - */ - private void setTimeBucket(TimeBucket timeBucket) - { - Preconditions.checkNotNull(timeBucket); - this.timeBucket = timeBucket; - this.customTimeBucket = new CustomTimeBucket(timeBucket); - } - - /** - * This is a helper method which sets and validates the {@link CustomTimeBucket}. - * - * @param customTimeBucket The {@link CustomTimeBucket} to set and validate. - */ - private void setCustomTimeBucket(CustomTimeBucket customTimeBucket) - { - Preconditions.checkNotNull(customTimeBucket); - this.customTimeBucket = customTimeBucket; - this.timeBucket = customTimeBucket.getTimeBucket(); - } - - /** - * Gets the {@link TimeBucket} for this {@link DimensionsDescriptor} object. - * - * @return The {@link TimeBucket} for this {@link DimensionsDescriptor} object. - * @deprecated use {@link #getCustomTimeBucket()} instead. - */ - @Deprecated - public TimeBucket getTimeBucket() - { - return timeBucket; - } - - /** - * Gets the {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object. - * - * @return The {@link CustomTimeBucket} for this {@link DimensionsDescriptor} object. - */ - public CustomTimeBucket getCustomTimeBucket() - { - return customTimeBucket; - } - - /** - * This is a helper method which sets and validates the set of key fields for this - * {@link DimensionsDescriptor} object. - * - * @param fields The set of key fields for this {@link DimensionsDescriptor} object. - */ - private void setFields(Fields fields) - { - Preconditions.checkNotNull(fields); - this.fields = fields; - } - - /** - * Returns the set of key fields for this {@link DimensionsDescriptor} object. - * - * @return The set of key fields for this {@link DimensionsDescriptor} object. - */ - public Fields getFields() - { - return fields; - } - - /** - * This method is used to create a new {@link FieldsDescriptor} object representing this - * {@link DimensionsDescriptor} object from another {@link FieldsDescriptor} object which - * defines the names and types of all the available key fields. - * - * @param parentDescriptor The {@link FieldsDescriptor} object which defines the name and - * type of all the available key fields. - * @return A {@link FieldsDescriptor} object which represents this {@link DimensionsDescriptor} (dimensions - * combination) - * derived from the given {@link FieldsDescriptor} object. - */ - public FieldsDescriptor createFieldsDescriptor(FieldsDescriptor parentDescriptor) - { - Map<String, Type> fieldToType = Maps.newHashMap(); - Map<String, Type> parentFieldToType = parentDescriptor.getFieldToType(); - - for (String field : this.fields.getFields()) { - if (RESERVED_DIMENSION_NAMES.contains(field)) { - continue; - } - - fieldToType.put(field, parentFieldToType.get(field)); - } - - if (timeBucket != null && timeBucket != TimeBucket.ALL) { - fieldToType.put(DIMENSION_TIME_BUCKET, DIMENSION_TIME_BUCKET_TYPE); - fieldToType.put(DIMENSION_TIME, Type.LONG); - } - - return new FieldsDescriptor(fieldToType); - } - - @Override - public int hashCode() - { - int hash = 7; - hash = 83 * hash + (this.customTimeBucket != null ? this.customTimeBucket.hashCode() : 0); - hash = 83 * hash + (this.fields != null ? this.fields.hashCode() : 0); - return hash; - } - - @Override - public boolean equals(Object obj) - { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final DimensionsDescriptor other = (DimensionsDescriptor)obj; - if (!this.customTimeBucket.equals(other.customTimeBucket)) { - return false; - } - if (this.fields != other.fields && (this.fields == null || !this.fields.equals(other.fields))) { - return false; - } - return true; - } - - @Override - public String toString() - { - return "DimensionsDescriptor{" + "timeBucket=" + customTimeBucket + ", fields=" + fields + '}'; - } - - @Override - public int compareTo(DimensionsDescriptor other) - { - if (this == other) { - return 0; - } - - List<String> thisFieldList = this.getFields().getFieldsList(); - List<String> otherFieldList = other.getFields().getFieldsList(); - - if (thisFieldList != otherFieldList) { - int compare = thisFieldList.size() - otherFieldList.size(); - - if (compare != 0) { - return compare; - } - - Collections.sort(thisFieldList); - Collections.sort(otherFieldList); - - for (int index = 0; index < thisFieldList.size(); index++) { - String thisField = thisFieldList.get(index); - String otherField = otherFieldList.get(index); - - int fieldCompare = thisField.compareTo(otherField); - - if (fieldCompare != 0) { - return fieldCompare; - } - } - } - - CustomTimeBucket thisBucket = this.getCustomTimeBucket(); - CustomTimeBucket otherBucket = other.getCustomTimeBucket(); - - if (thisBucket == null && otherBucket == null) { - return 0; - } else if (thisBucket != null && otherBucket == null) { - return 1; - } else if (thisBucket == null && otherBucket != null) { - return -1; - } else { - return thisBucket.compareTo(otherBucket); - } - } - - private static final Logger LOG = LoggerFactory.getLogger(DimensionsDescriptor.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java b/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java deleted file mode 100644 index b12b631..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/DimensionsEvent.java +++ /dev/null @@ -1,844 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions; - -import java.io.Serializable; -import java.util.List; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -import com.datatorrent.lib.appdata.gpo.GPOMutable; -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.lib.dimensions.aggregator.AggregateEvent; - -/** - * <p> - * This is the base class for the events that are used for internal processing in the subclasses of - * {@link AbstractDimensionsComputationFlexible} and {@link DimensionsStoreHDHT}. - * </p> - * <p> - * A {@link DimensionsEvent} is constructed from two parts: an {@link EventKey} and a {@link GPOMutable} object - * which contains the values of aggregate fields. The {@link EventKey} is used to identify the dimension combination - * an event belongs to, and consequently determines what input values should be aggregated together. The aggregates - * are the actual data payload of the event which are to be aggregated. - * </p> - * - * @since 3.1.0 - */ -public class DimensionsEvent implements Serializable -{ - private static final long serialVersionUID = 201503231204L; - - /** - * This is the {@link GPOMutable} object which holds all the aggregates. - */ - protected GPOMutable aggregates; - /** - * This is the event key for the event. - */ - protected EventKey eventKey; - - /** - * Constructor for Kryo. - */ - private DimensionsEvent() - { - //For kryo - } - - /** - * This creates a {@link DimensionsEvent} from the given event key and aggregates. - * - * @param eventKey The key from which to create a {@link DimensionsEvent}. - * @param aggregates The aggregates from which to create {@link DimensionsEvent}. - */ - public DimensionsEvent(EventKey eventKey, - GPOMutable aggregates) - { - setEventKey(eventKey); - setAggregates(aggregates); - } - - /** - * Creates a DimensionsEvent with the given key values, aggregates and ids. - * - * @param keys The values for fields in the key. - * @param aggregates The values for fields in the aggregate. - * @param bucketID The bucketID - * @param schemaID The schemaID. - * @param dimensionDescriptorID The dimensionsDescriptorID. - * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. - */ - public DimensionsEvent(GPOMutable keys, - GPOMutable aggregates, - int bucketID, - int schemaID, - int dimensionDescriptorID, - int aggregatorIndex) - { - this.eventKey = new EventKey(bucketID, - schemaID, - dimensionDescriptorID, - aggregatorIndex, - keys); - setAggregates(aggregates); - } - - /** - * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0. - * - * @param keys The value for fields in the key. - * @param aggregates The value for fields in the aggregate. - * @param schemaID The schemaID. - * @param dimensionDescriptorID The dimensionsDescriptorID. - * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. - */ - public DimensionsEvent(GPOMutable keys, - GPOMutable aggregates, - int schemaID, - int dimensionDescriptorID, - int aggregatorIndex) - { - this.eventKey = new EventKey(schemaID, - dimensionDescriptorID, - aggregatorIndex, - keys); - setAggregates(aggregates); - } - - /** - * This is a helper method which sets the {@link EventKey} of the event to - * be the same as the given {@link EventKey}. - * - * @param eventKey The {@link EventKey} to set on this event. - */ - protected final void setEventKey(EventKey eventKey) - { - this.eventKey = new EventKey(eventKey); - } - - /** - * This is a helper method which sets the aggregates for this event. - * - * @param aggregates The aggregates for this event. - */ - protected final void setAggregates(GPOMutable aggregates) - { - Preconditions.checkNotNull(aggregates); - this.aggregates = aggregates; - } - - /** - * This is a helper method which returns the aggregates for this event. - * - * @return The helper method which returns the aggregates for this event. - */ - public GPOMutable getAggregates() - { - return aggregates; - } - - /** - * Returns the {@link EventKey} for this event. - * - * @return The {@link EventKey} for this event. - */ - public EventKey getEventKey() - { - return eventKey; - } - - /** - * This is a convenience method which returns the values of the key fields in this event's - * {@link EventKey}. - * - * @return The values of the key fields in this event's {@link EventKey}. - */ - public GPOMutable getKeys() - { - return eventKey.getKey(); - } - - /** - * This is a convenience method which returns the schemaID of this event's {@link EventKey}. - * - * @return The schemaID of this event's {@link EventKey}. - */ - public int getSchemaID() - { - return eventKey.getSchemaID(); - } - - /** - * Returns the id of the dimension descriptor (key combination) for which this event contains data. - * - * @return The id of the dimension descriptor (key combination) for which this event contains data. - */ - public int getDimensionDescriptorID() - { - return eventKey.getDimensionDescriptorID(); - } - - /** - * Returns the id of the aggregator which is applied to this event's data. - * - * @return Returns the id of the aggregator which is applied to this event's data. - */ - public int getAggregatorID() - { - return eventKey.getAggregatorID(); - } - - /** - * Returns the bucketID assigned to this event. The bucketID is useful for this event in the case that the event - * is sent to a partitioned HDHT operator. Each partitioned HDHT operator can use the bucketIDs for the buckets it - * writes to as a partition key. - * - * @return The bucketID assigned to this event. - */ - public int getBucketID() - { - return eventKey.getBucketID(); - } - - /** - * This is a utility method which copies the given src event to the given destination event. - * - * @param aeDest The destination event. - * @param aeSrc The source event. - */ - public static void copy(DimensionsEvent aeDest, DimensionsEvent aeSrc) - { - GPOMutable destAggs = aeDest.getAggregates(); - GPOMutable srcAggs = aeSrc.getAggregates(); - - if (srcAggs.getFieldsBoolean() != null) { - System.arraycopy(srcAggs.getFieldsBoolean(), 0, destAggs.getFieldsBoolean(), 0, - srcAggs.getFieldsBoolean().length); - } - - if (srcAggs.getFieldsCharacter() != null) { - System.arraycopy(srcAggs.getFieldsCharacter(), 0, destAggs.getFieldsCharacter(), 0, - srcAggs.getFieldsCharacter().length); - } - - if (srcAggs.getFieldsString() != null) { - System.arraycopy(srcAggs.getFieldsString(), 0, destAggs.getFieldsString(), 0, srcAggs.getFieldsString().length); - } - - if (srcAggs.getFieldsShort() != null) { - System.arraycopy(srcAggs.getFieldsShort(), 0, destAggs.getFieldsShort(), 0, srcAggs.getFieldsShort().length); - } - - if (srcAggs.getFieldsInteger() != null) { - System.arraycopy(srcAggs.getFieldsInteger(), 0, destAggs.getFieldsInteger(), 0, - srcAggs.getFieldsInteger().length); - } - - if (srcAggs.getFieldsLong() != null) { - System.arraycopy(srcAggs.getFieldsLong(), 0, destAggs.getFieldsLong(), 0, srcAggs.getFieldsLong().length); - } - - if (srcAggs.getFieldsFloat() != null) { - System.arraycopy(srcAggs.getFieldsFloat(), 0, destAggs.getFieldsFloat(), 0, srcAggs.getFieldsFloat().length); - } - - if (srcAggs.getFieldsDouble() != null) { - System.arraycopy(srcAggs.getFieldsDouble(), 0, destAggs.getFieldsDouble(), 0, srcAggs.getFieldsDouble().length); - } - } - - /** - * <p> - * The {@link EventKey} represents a dimensions combination for a dimensions event. It contains the keys and values - * which define a dimensions combination. It's very similar to a {@link DimensionsDescriptor} which is also used to - * define part of a dimensions combination. The difference between the two is that a {@link DimensionsDescriptor} - * only contains what - * keys are included in the combination, not the values of those keys (which the {@link EventKey} has. - * </p> - * <p> - * In addition to holding the keys in a dimensions combination and their values, the event key holds some meta - * information. - * The meta information included and their purposes are the following: - * <ul> - * <li><b>bucketID:</b> This is set when the dimension store responsible for storing the data is partitioned. In that - * case the bucketID is used as the partitionID.</li> - * <li><b>schemaID:</b> This is the id of the {@link DimensionalSchema} that this {@link EventKey} corresponds to - * .</li> - * <li><b>dimensionDescriptorID:</b> This is the id of the {@link DimensionsDescriptor} that this {@link EventKey} - * corresponds to.</li> - * <li><b>aggregatorID:</b> This is the id of the aggregator that is used to aggregate the values associated with - * this {@link EventKey} - * in a {@link DimensionsEvent}.</li> - * </ul> - * </p> - */ - public static class EventKey implements Serializable - { - private static final long serialVersionUID = 201503231205L; - - /** - * The bucketID assigned to this event key. - */ - private int bucketID; - /** - * The schemaID corresponding to the {@link DimensionalSchema} that this {@link EventKey} - * corresponds to. - */ - private int schemaID; - /** - * The dimensionsDescriptorID of the {@link DimensionDescriptor} in the corresponding {@link DimensionalSchema}. - */ - private int dimensionDescriptorID; - /** - * The id of the aggregator which should be used to aggregate the values corresponding to this - * {@link EventKey}. - */ - private int aggregatorID; - /** - * The values of the key fields. - */ - private GPOMutable key; - - /** - * Constructor for serialization. - */ - private EventKey() - { - //For kryo - } - - /** - * Copy constructor. - * - * @param eventKey The {@link EventKey} whose data will be copied. - */ - public EventKey(EventKey eventKey) - { - this.bucketID = eventKey.bucketID; - this.schemaID = eventKey.schemaID; - this.dimensionDescriptorID = eventKey.dimensionDescriptorID; - this.aggregatorID = eventKey.aggregatorID; - - this.key = new GPOMutable(eventKey.getKey()); - } - - /** - * Creates an event key with the given data. - * - * @param bucketID The bucketID assigned to this {@link EventKey}. - * @param schemaID The schemaID of the corresponding {@link DimensionalSchema}. - * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding - * {@link DimensionDescriptor} in the {@link DimensionalSchema}. - * @param aggregatorID The id of the aggregator which should be used to aggregate the values - * corresponding to this - * {@link EventKey}. - * @param key The values of the keys. - */ - public EventKey(int bucketID, - int schemaID, - int dimensionDescriptorID, - int aggregatorID, - GPOMutable key) - { - setBucketID(bucketID); - setSchemaID(schemaID); - setDimensionDescriptorID(dimensionDescriptorID); - setAggregatorID(aggregatorID); - setKey(key); - } - - /** - * Creates an event key with the given data. This constructor assumes that the bucketID will be 0. - * - * @param schemaID The schemaID of the corresponding {@link DimensionalSchema}. - * @param dimensionDescriptorID The dimensionDescriptorID of the corresponding {@link DimensionDescriptor}. - * @param aggregatorID The id of the aggregator which should be used to aggregate the values - * corresponding to this - * {@link EventKey}. - * @param key The values of the keys. - */ - public EventKey(int schemaID, - int dimensionDescriptorID, - int aggregatorID, - GPOMutable key) - { - setSchemaID(schemaID); - setDimensionDescriptorID(dimensionDescriptorID); - setAggregatorID(aggregatorID); - setKey(key); - } - - /** - * Sets the dimension descriptor ID. - * - * @param dimensionDescriptorID The dimension descriptor ID to set. - */ - private void setDimensionDescriptorID(int dimensionDescriptorID) - { - this.dimensionDescriptorID = dimensionDescriptorID; - } - - /** - * Returns the dimension descriptor ID. - * - * @return The dimension descriptor ID. - */ - public int getDimensionDescriptorID() - { - return dimensionDescriptorID; - } - - /** - * Returns the aggregatorID. - * - * @return The aggregatorID. - */ - public int getAggregatorID() - { - return aggregatorID; - } - - /** - * Sets the aggregatorID. - * - * @param aggregatorID The aggregatorID to set. - */ - private void setAggregatorID(int aggregatorID) - { - this.aggregatorID = aggregatorID; - } - - /** - * Returns the schemaID. - * - * @return The schemaID to set. - */ - public int getSchemaID() - { - return schemaID; - } - - /** - * Sets the schemaID. - * - * @param schemaID The schemaID to set. - */ - private void setSchemaID(int schemaID) - { - this.schemaID = schemaID; - } - - /** - * Returns the key values. - * - * @return The key values. - */ - public GPOMutable getKey() - { - return key; - } - - /** - * Sets the bucektID. - * - * @param bucketID The bucketID. - */ - private void setBucketID(int bucketID) - { - this.bucketID = bucketID; - } - - /** - * Gets the bucketID. - * - * @return The bucketID. - */ - public int getBucketID() - { - return bucketID; - } - - /** - * Sets the key values. - * - * @param key The key values to set. - */ - private void setKey(GPOMutable key) - { - Preconditions.checkNotNull(key); - this.key = key; - } - - @Override - public int hashCode() - { - int hash = 3; - hash = 97 * hash + this.bucketID; - hash = 97 * hash + this.schemaID; - hash = 97 * hash + this.dimensionDescriptorID; - hash = 97 * hash + this.aggregatorID; - hash = 97 * hash + (this.key != null ? this.key.hashCode() : 0); - return hash; - } - - @Override - public boolean equals(Object obj) - { - if (obj == null) { - return false; - } - - if (getClass() != obj.getClass()) { - return false; - } - - final EventKey other = (EventKey)obj; - - if (this.bucketID != other.bucketID) { - return false; - } - - if (this.schemaID != other.schemaID) { - return false; - } - - if (this.dimensionDescriptorID != other.dimensionDescriptorID) { - return false; - } - - if (this.aggregatorID != other.aggregatorID) { - return false; - } - - if (this.key != other.key && (this.key == null || !this.key.equals(other.key))) { - return false; - } - - return true; - } - - @Override - public String toString() - { - return "EventKey{" + "schemaID=" + schemaID + ", dimensionDescriptorID=" + dimensionDescriptorID + - ", aggregatorIndex=" + aggregatorID + ", key=" + key + '}'; - } - - public static List<EventKey> createEventKeys(int schemaId, - int dimensionsDescriptorId, - int aggregatorId, - List<GPOMutable> keys) - { - List<EventKey> eventKeys = Lists.newArrayList(); - - for (GPOMutable key : keys) { - eventKeys.add(new EventKey(schemaId, dimensionsDescriptorId, aggregatorId, key)); - } - - return eventKeys; - } - } - - @Override - public int hashCode() - { - int hash = 5; - hash = 79 * hash + (this.aggregates != null ? this.aggregates.hashCode() : 0); - hash = 79 * hash + (this.eventKey != null ? this.eventKey.hashCode() : 0); - return hash; - } - - @Override - public boolean equals(Object obj) - { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final DimensionsEvent other = (DimensionsEvent)obj; - if (this.aggregates != other.aggregates && (this.aggregates == null || !this.aggregates.equals(other.aggregates))) { - return false; - } - if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) { - return false; - } - return true; - } - - public static class InputEvent extends DimensionsEvent - { - private static final long serialVersionUID = 201506210406L; - public boolean used = false; - - private InputEvent() - { - } - - /** - * This creates a {@link DimensionsEvent} from the given event key and aggregates. - * - * @param eventKey The key from which to create a {@link DimensionsEvent}. - * @param aggregates The aggregates from which to create {@link DimensionsEvent}. - */ - public InputEvent(EventKey eventKey, - GPOMutable aggregates) - { - setEventKey(eventKey); - setAggregates(aggregates); - } - - /** - * Creates a DimensionsEvent with the given key values, aggregates and ids. - * - * @param keys The values for fields in the key. - * @param aggregates The values for fields in the aggregate. - * @param bucketID The bucketID - * @param schemaID The schemaID. - * @param dimensionDescriptorID The dimensionsDescriptorID. - * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. - */ - public InputEvent(GPOMutable keys, - GPOMutable aggregates, - int bucketID, - int schemaID, - int dimensionDescriptorID, - int aggregatorIndex) - { - this.eventKey = new EventKey(bucketID, - schemaID, - dimensionDescriptorID, - aggregatorIndex, - keys); - setAggregates(aggregates); - } - - /** - * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0. - * - * @param keys The value for fields in the key. - * @param aggregates The value for fields in the aggregate. - * @param schemaID The schemaID. - * @param dimensionDescriptorID The dimensionsDescriptorID. - * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. - */ - public InputEvent(GPOMutable keys, - GPOMutable aggregates, - int schemaID, - int dimensionDescriptorID, - int aggregatorIndex) - { - this.eventKey = new EventKey(schemaID, - dimensionDescriptorID, - aggregatorIndex, - keys); - setAggregates(aggregates); - } - - @Override - public int hashCode() - { - return GPOUtils.hashcode(this.getKeys()); - } - - @Override - public boolean equals(Object obj) - { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final DimensionsEvent other = (DimensionsEvent)obj; - - if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) { - return false; - } - return true; - } - } - - public static class Aggregate extends DimensionsEvent implements AggregateEvent - { - private static final long serialVersionUID = 201506190110L; - - /** - * This is the aggregatorIndex assigned to this event. - */ - protected int aggregatorIndex; - private GPOMutable metaData; - - public Aggregate() - { - //for kryo and for extending classes - } - - /** - * This creates a {@link DimensionsEvent} from the given event key and aggregates. - * - * @param eventKey The key from which to create a {@link DimensionsEvent}. - * @param aggregates The aggregates from which to create {@link DimensionsEvent}. - */ - public Aggregate(EventKey eventKey, - GPOMutable aggregates) - { - setEventKey(eventKey); - setAggregates(aggregates); - } - - public Aggregate(EventKey eventKey, - GPOMutable aggregates, - GPOMutable metaData) - { - super(eventKey, - aggregates); - - this.metaData = metaData; - } - - /** - * Creates a DimensionsEvent with the given key values, aggregates and ids. - * - * @param keys The values for fields in the key. - * @param aggregates The values for fields in the aggregate. - * @param bucketID The bucketID - * @param schemaID The schemaID. - * @param dimensionDescriptorID The dimensionsDescriptorID. - * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. - */ - public Aggregate(GPOMutable keys, - GPOMutable aggregates, - int bucketID, - int schemaID, - int dimensionDescriptorID, - int aggregatorIndex) - { - this.eventKey = new EventKey(bucketID, - schemaID, - dimensionDescriptorID, - aggregatorIndex, - keys); - setAggregates(aggregates); - } - - public Aggregate(GPOMutable keys, - GPOMutable aggregates, - GPOMutable metaData, - int bucketID, - int schemaID, - int dimensionDescriptorID, - int aggregatorIndex) - { - this(keys, - aggregates, - bucketID, - schemaID, - dimensionDescriptorID, - aggregatorIndex); - - this.metaData = metaData; - } - - /** - * This creates an event with the given data. Note, this constructor assumes that the bucketID will be 0. - * - * @param keys The value for fields in the key. - * @param aggregates The value for fields in the aggregate. - * @param schemaID The schemaID. - * @param dimensionDescriptorID The dimensionsDescriptorID. - * @param aggregatorIndex The aggregatorIndex assigned to this event by the unifier. - */ - public Aggregate(GPOMutable keys, - GPOMutable aggregates, - int schemaID, - int dimensionDescriptorID, - int aggregatorIndex) - { - this.eventKey = new EventKey(schemaID, - dimensionDescriptorID, - aggregatorIndex, - keys); - setAggregates(aggregates); - } - - public Aggregate(GPOMutable keys, - GPOMutable aggregates, - GPOMutable metaData, - int schemaID, - int dimensionDescriptorID, - int aggregatorIndex) - { - this(keys, - aggregates, - schemaID, - dimensionDescriptorID, - aggregatorIndex); - - this.metaData = metaData; - } - - public void setMetaData(GPOMutable metaData) - { - this.metaData = metaData; - } - - public GPOMutable getMetaData() - { - return metaData; - } - - public void setAggregatorIndex(int aggregatorIndex) - { - this.aggregatorIndex = aggregatorIndex; - } - - @Override - public int getAggregatorIndex() - { - return aggregatorIndex; - } - - @Override - public int hashCode() - { - return GPOUtils.hashcode(this.getKeys()); - } - - @Override - public boolean equals(Object obj) - { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final DimensionsEvent other = (DimensionsEvent)obj; - - if (this.eventKey != other.eventKey && (this.eventKey == null || !this.eventKey.equals(other.eventKey))) { - return false; - } - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java deleted file mode 100644 index f92bfbf..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AbstractIncrementalAggregator.java +++ /dev/null @@ -1,190 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -import com.datatorrent.lib.appdata.gpo.GPOMutable; -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; -import com.datatorrent.lib.dimensions.DimensionsConversionContext; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey; -import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; - -/** - * * <p> - * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a - * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for - * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an - * {@link IncrementalAggregator}. - * </p> - * <p> - * {@link IncrementalAggregator}s are intended to be used with subclasses of - * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which - * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator - * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which - * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to - * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields, - * one for cost and one for revenue. - * </p> - * - */ -public abstract class AbstractIncrementalAggregator implements IncrementalAggregator -{ - private static final long serialVersionUID = 201506211153L; - - /** - * The conversion context for this aggregator. - */ - protected DimensionsConversionContext context; - - public AbstractIncrementalAggregator() - { - } - - @Override - public void setDimensionsConversionContext(DimensionsConversionContext context) - { - this.context = Preconditions.checkNotNull(context); - } - - @Override - public Aggregate getGroup(InputEvent src, int aggregatorIndex) - { - src.used = true; - Aggregate aggregate = createAggregate(src, - context, - aggregatorIndex); - return aggregate; - } - - @Override - public int hashCode(InputEvent inputEvent) - { - long timestamp = -1L; - boolean hasTime = this.context.inputTimestampIndex != -1 - && this.context.outputTimebucketIndex != -1; - - if (hasTime) { - timestamp = inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex]; - inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] - = this.context.dd.getCustomTimeBucket().roundDown(timestamp); - } - - int hashCode = GPOUtils.indirectHashcode(inputEvent.getKeys(), context.indexSubsetKeys); - - if (hasTime) { - inputEvent.getKeys().getFieldsLong()[this.context.inputTimestampIndex] = timestamp; - } - - return hashCode; - } - - @Override - public boolean equals(InputEvent inputEvent1, InputEvent inputEvent2) - { - long timestamp1 = 0; - long timestamp2 = 0; - - if (context.inputTimestampIndex != -1) { - timestamp1 = inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex]; - inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = - context.dd.getCustomTimeBucket().roundDown(timestamp1); - - timestamp2 = inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex]; - inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = - context.dd.getCustomTimeBucket().roundDown(timestamp2); - } - - boolean equals = GPOUtils.subsetEquals(inputEvent2.getKeys(), - inputEvent1.getKeys(), - context.indexSubsetKeys); - - if (context.inputTimestampIndex != -1) { - inputEvent1.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp1; - inputEvent2.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp2; - } - - return equals; - } - - /** - * Creates an {@link Aggregate} from the given {@link InputEvent}. - * - * @param inputEvent The {@link InputEvent} to unpack into an {@link Aggregate}. - * @param context The conversion context required to transform the {@link InputEvent} into - * the correct {@link Aggregate}. - * @param aggregatorIndex The aggregatorIndex assigned to this {@link Aggregate}. - * @return The converted {@link Aggregate}. - */ - public static Aggregate createAggregate(InputEvent inputEvent, - DimensionsConversionContext context, - int aggregatorIndex) - { - GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor); - EventKey eventKey = createEventKey(inputEvent, - context, - aggregatorIndex); - - Aggregate aggregate = new Aggregate(eventKey, - aggregates); - aggregate.setAggregatorIndex(aggregatorIndex); - - return aggregate; - } - - /** - * Creates an {@link EventKey} from the given {@link InputEvent}. - * - * @param inputEvent The {@link InputEvent} to extract an {@link EventKey} from. - * @param context The conversion context required to extract the {@link EventKey} from - * the given {@link InputEvent}. - * @param aggregatorIndex The aggregatorIndex to assign to this {@link InputEvent}. - * @return The {@link EventKey} extracted from the given {@link InputEvent}. - */ - public static EventKey createEventKey(InputEvent inputEvent, - DimensionsConversionContext context, - int aggregatorIndex) - { - GPOMutable keys = new GPOMutable(context.keyDescriptor); - GPOUtils.indirectCopy(keys, inputEvent.getKeys(), context.indexSubsetKeys); - - if (context.outputTimebucketIndex >= 0) { - CustomTimeBucket timeBucket = context.dd.getCustomTimeBucket(); - - keys.getFieldsInteger()[context.outputTimebucketIndex] = context.customTimeBucketRegistry.getTimeBucketId( - timeBucket); - keys.getFieldsLong()[context.outputTimestampIndex] = - timeBucket.roundDown(inputEvent.getKeys().getFieldsLong()[context.inputTimestampIndex]); - } - - EventKey eventKey = new EventKey(context.schemaID, - context.dimensionsDescriptorID, - context.aggregatorID, - keys); - - return eventKey; - } - - private static final Logger LOG = LoggerFactory.getLogger(AbstractIncrementalAggregator.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java deleted file mode 100644 index e8f2f3e..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregateEvent.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import it.unimi.dsi.fastutil.Hash; - -/** - * @since 3.3.0 - */ -public interface AggregateEvent -{ - int getAggregatorIndex(); - - public static interface Aggregator<EVENT, AGGREGATE extends AggregateEvent> extends Hash.Strategy<EVENT> - { - AGGREGATE getGroup(EVENT src, int aggregatorIndex); - - void aggregate(AGGREGATE dest, EVENT src); - - void aggregate(AGGREGATE dest, AGGREGATE src); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java deleted file mode 100644 index c15bf25..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorAverage.java +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import java.util.List; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; - -import com.datatorrent.api.annotation.Name; -import com.datatorrent.lib.appdata.gpo.GPOMutable; -import com.datatorrent.lib.appdata.schemas.Fields; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; - -/** - * This is the average {@link OTFAggregator}. - * - * @since 3.1.0 - */ -@Name("AVG") -public class AggregatorAverage implements OTFAggregator -{ - private static final long serialVersionUID = 20154301644L; - - /** - * The array index of the sum aggregates in the argument list of the {@link #aggregate} function. - */ - public static int SUM_INDEX = 0; - /** - * The array index of the count aggregates in the argument list of the {@link #aggregate} function. - */ - public static int COUNT_INDEX = 1; - /** - * The singleton instance of this class. - */ - public static final AggregatorAverage INSTANCE = new AggregatorAverage(); - - /** - * The list of {@link IncrementalAggregator}s that this {@link OTFAggregator} depends on. - */ - public static final transient List<Class<? extends IncrementalAggregator>> CHILD_AGGREGATORS = - ImmutableList.of(AggregatorIncrementalType.SUM.getAggregator().getClass(), - AggregatorIncrementalType.COUNT.getAggregator().getClass()); - - /** - * Constructor for singleton pattern. - */ - protected AggregatorAverage() - { - //Do nothing - } - - @Override - public List<Class<? extends IncrementalAggregator>> getChildAggregators() - { - return CHILD_AGGREGATORS; - } - - @Override - public GPOMutable aggregate(GPOMutable... aggregates) - { - Preconditions.checkArgument(aggregates.length == getChildAggregators().size(), - "The number of arguments " + aggregates.length + - " should be the same as the number of child aggregators " + getChildAggregators().size()); - - GPOMutable sumAggregation = aggregates[SUM_INDEX]; - GPOMutable countAggregation = aggregates[COUNT_INDEX]; - - FieldsDescriptor fieldsDescriptor = sumAggregation.getFieldDescriptor(); - Fields fields = fieldsDescriptor.getFields(); - GPOMutable result = new GPOMutable(AggregatorUtils.getOutputFieldsDescriptor(fields, this)); - - long count = countAggregation.getFieldsLong()[0]; - - for (String field : fields.getFields()) { - Type type = sumAggregation.getFieldDescriptor().getType(field); - - switch (type) { - case BYTE: { - double val = ((double)sumAggregation.getFieldByte(field)) / - ((double)count); - result.setField(field, val); - break; - } - case SHORT: { - double val = ((double)sumAggregation.getFieldShort(field)) / - ((double)count); - result.setField(field, val); - break; - } - case INTEGER: { - double val = ((double)sumAggregation.getFieldInt(field)) / - ((double)count); - result.setField(field, val); - break; - } - case LONG: { - double val = ((double)sumAggregation.getFieldLong(field)) / - ((double)count); - result.setField(field, val); - break; - } - case FLOAT: { - double val = sumAggregation.getFieldFloat(field) / - ((double)count); - result.setField(field, val); - break; - } - case DOUBLE: { - double val = sumAggregation.getFieldDouble(field) / - ((double)count); - result.setField(field, val); - break; - } - default: { - throw new UnsupportedOperationException("The type " + type + " is not supported."); - } - } - } - - return result; - } - - @Override - public Type getOutputType() - { - return Type.DOUBLE; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java deleted file mode 100644 index 8566e1c..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCount.java +++ /dev/null @@ -1,128 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import java.util.Collections; -import java.util.Map; - -import com.google.common.collect.Maps; - -import com.datatorrent.api.annotation.Name; -import com.datatorrent.lib.appdata.gpo.GPOMutable; -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.EventKey; -import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; - -/** - * This {@link IncrementalAggregator} performs a count of the number of times an input is encountered. - * - * @since 3.1.0 - */ -@Name("COUNT") -public class AggregatorCount extends AbstractIncrementalAggregator -{ - private static final long serialVersionUID = 20154301645L; - - /** - * This is a map whose keys represent input types and whose values - * represent the corresponding output types. - */ - public static final transient Map<Type, Type> TYPE_CONVERSION_MAP; - - static { - Map<Type, Type> typeConversionMap = Maps.newHashMap(); - - for (Type type : Type.values()) { - typeConversionMap.put(type, Type.LONG); - } - - TYPE_CONVERSION_MAP = Collections.unmodifiableMap(typeConversionMap); - } - - public AggregatorCount() - { - //Do nothing - } - - @Override - public Aggregate getGroup(InputEvent src, int aggregatorIndex) - { - src.used = true; - GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor); - GPOMutable keys = new GPOMutable(context.keyDescriptor); - GPOUtils.indirectCopy(keys, src.getKeys(), context.indexSubsetKeys); - - EventKey eventKey = createEventKey(src, - context, - aggregatorIndex); - - long[] longFields = aggregates.getFieldsLong(); - - for (int index = 0; - index < longFields.length; - index++) { - longFields[index] = 0; - } - - return new Aggregate(eventKey, - aggregates); - } - - @Override - public void aggregate(Aggregate dest, InputEvent src) - { - long[] fieldsLong = dest.getAggregates().getFieldsLong(); - - for (int index = 0; - index < fieldsLong.length; - index++) { - //increment count - fieldsLong[index]++; - } - } - - @Override - public void aggregate(Aggregate destAgg, Aggregate srcAgg) - { - long[] destLongs = destAgg.getAggregates().getFieldsLong(); - long[] srcLongs = srcAgg.getAggregates().getFieldsLong(); - - for (int index = 0; - index < destLongs.length; - index++) { - //aggregate count - destLongs[index] += srcLongs[index]; - } - } - - @Override - public Type getOutputType(Type inputType) - { - return TYPE_CONVERSION_MAP.get(inputType); - } - - @Override - public FieldsDescriptor getMetaDataDescriptor() - { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java deleted file mode 100644 index f5924b8..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorCumSum.java +++ /dev/null @@ -1,233 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import java.util.List; -import java.util.Map; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import com.datatorrent.api.annotation.Name; -import com.datatorrent.lib.appdata.gpo.GPOMutable; -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.lib.appdata.gpo.Serde; -import com.datatorrent.lib.appdata.gpo.SerdeFieldsDescriptor; -import com.datatorrent.lib.appdata.gpo.SerdeListGPOMutable; -import com.datatorrent.lib.appdata.gpo.SerdeObjectPayloadFix; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; - -@Name("CUM_SUM") -/** - * @since 3.1.0 - */ - -public class AggregatorCumSum extends AggregatorSum -{ - private static final long serialVersionUID = 201506280518L; - - public static final int KEY_FD_INDEX = 0; - public static final int AGGREGATE_FD_INDEX = 1; - public static final int KEYS_INDEX = 2; - public static final int AGGREGATES_INDEX = 3; - - public static final FieldsDescriptor META_DATA_FIELDS_DESCRIPTOR; - - static { - Map<String, Type> fieldToType = Maps.newHashMap(); - fieldToType.put("fdkeys", Type.OBJECT); - fieldToType.put("fdvalues", Type.OBJECT); - fieldToType.put("keys", Type.OBJECT); - fieldToType.put("values", Type.OBJECT); - - Map<String, Serde> fieldToSerde = Maps.newHashMap(); - fieldToSerde.put("fdkeys", SerdeFieldsDescriptor.INSTANCE); - fieldToSerde.put("fdvalues", SerdeFieldsDescriptor.INSTANCE); - fieldToSerde.put("keys", SerdeListGPOMutable.INSTANCE); - fieldToSerde.put("values", SerdeListGPOMutable.INSTANCE); - - META_DATA_FIELDS_DESCRIPTOR = new FieldsDescriptor(fieldToType, - fieldToSerde, - new PayloadFix()); - } - - public AggregatorCumSum() - { - } - - @Override - public Aggregate getGroup(InputEvent src, int aggregatorIndex) - { - src.used = true; - Aggregate agg = createAggregate(src, - context, - aggregatorIndex); - - GPOUtils.indirectCopy(agg.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); - - GPOMutable metaData = new GPOMutable(getMetaDataDescriptor()); - - GPOMutable fullKey = new GPOMutable(src.getKeys()); - - if (context.inputTimestampIndex >= 0) { - fullKey.getFieldsLong()[context.inputTimestampIndex] = -1L; - } - - List<GPOMutable> keys = Lists.newArrayList(fullKey); - - GPOMutable value = new GPOMutable(agg.getAggregates()); - List<GPOMutable> values = Lists.newArrayList(value); - - metaData.getFieldsObject()[KEY_FD_INDEX] = fullKey.getFieldDescriptor(); - metaData.getFieldsObject()[AGGREGATE_FD_INDEX] = value.getFieldDescriptor(); - metaData.getFieldsObject()[KEYS_INDEX] = keys; - metaData.getFieldsObject()[AGGREGATES_INDEX] = values; - agg.setMetaData(metaData); - - return agg; - } - - @Override - public void aggregate(Aggregate dest, InputEvent src) - { - @SuppressWarnings("unchecked") - List<GPOMutable> destKeys = - (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX]; - - @SuppressWarnings("unchecked") - List<GPOMutable> destAggregates = - (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX]; - - long timestamp = 0L; - - if (context.inputTimestampIndex >= 0) { - timestamp = src.getKeys().getFieldsLong()[context.inputTimestampIndex]; - src.getKeys().getFieldsLong()[context.inputTimestampIndex] = -1L; - } - - if (!contains(destKeys, src.getKeys())) { - destKeys.add(new GPOMutable(src.getKeys())); - - GPOMutable aggregates = new GPOMutable(context.aggregateDescriptor); - GPOUtils.indirectCopy(aggregates, src.getAggregates(), context.indexSubsetAggregates); - - destAggregates.add(aggregates); - - this.aggregateAggs(dest.getAggregates(), aggregates); - } - - if (context.inputTimestampIndex >= 0) { - src.getKeys().getFieldsLong()[context.inputTimestampIndex] = timestamp; - } - } - - @Override - public void aggregate(Aggregate dest, Aggregate src) - { - dest.getMetaData().applyObjectPayloadFix(); - src.getMetaData().applyObjectPayloadFix(); - - @SuppressWarnings("unchecked") - List<GPOMutable> destKeys = - (List<GPOMutable>)dest.getMetaData().getFieldsObject()[KEYS_INDEX]; - - @SuppressWarnings("unchecked") - List<GPOMutable> srcKeys = - (List<GPOMutable>)src.getMetaData().getFieldsObject()[KEYS_INDEX]; - - @SuppressWarnings("unchecked") - List<GPOMutable> destAggregates = - (List<GPOMutable>)dest.getMetaData().getFieldsObject()[AGGREGATES_INDEX]; - - @SuppressWarnings("unchecked") - List<GPOMutable> srcAggregates = - (List<GPOMutable>)src.getMetaData().getFieldsObject()[AGGREGATES_INDEX]; - - List<GPOMutable> newKeys = Lists.newArrayList(); - List<GPOMutable> newAggs = Lists.newArrayList(); - - for (int index = 0; - index < srcKeys.size(); - index++) { - GPOMutable currentSrcKey = srcKeys.get(index); - GPOMutable currentSrcAgg = srcAggregates.get(index); - - if (!contains(destKeys, currentSrcKey)) { - newKeys.add(currentSrcKey); - newAggs.add(currentSrcAgg); - - this.aggregateAggs(dest.getAggregates(), currentSrcAgg); - } - } - - destKeys.addAll(newKeys); - destAggregates.addAll(newAggs); - } - - private boolean contains(List<GPOMutable> mutables, GPOMutable mutable) - { - for (int index = 0; - index < mutables.size(); - index++) { - GPOMutable mutableFromList = mutables.get(index); - - if (GPOUtils.equals(mutableFromList, mutable)) { - return true; - } - } - - return false; - } - - @Override - public FieldsDescriptor getMetaDataDescriptor() - { - return META_DATA_FIELDS_DESCRIPTOR; - } - - public static class PayloadFix implements SerdeObjectPayloadFix - { - @Override - public void fix(Object[] objects) - { - FieldsDescriptor keyfd = (FieldsDescriptor)objects[KEY_FD_INDEX]; - FieldsDescriptor valuefd = (FieldsDescriptor)objects[AGGREGATE_FD_INDEX]; - - @SuppressWarnings("unchecked") - List<GPOMutable> keyMutables = (List<GPOMutable>)objects[KEYS_INDEX]; - @SuppressWarnings("unchecked") - List<GPOMutable> aggregateMutables = (List<GPOMutable>)objects[AGGREGATES_INDEX]; - - fix(keyfd, keyMutables); - fix(valuefd, aggregateMutables); - } - - private void fix(FieldsDescriptor fd, List<GPOMutable> mutables) - { - for (int index = 0; - index < mutables.size(); - index++) { - mutables.get(index).setFieldDescriptor(fd); - } - } - } -}
