http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java deleted file mode 100644 index e1bf7d4..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorFirst.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import com.datatorrent.api.annotation.Name; -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; - -/** - * <p> - * This aggregator creates an aggregate out of the first {@link InputEvent} encountered by this aggregator. All - * subsequent - * {@link InputEvent}s are ignored. - * </p> - * <p> - * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so - * one is picked arbitrarily to be the first. - * </p> - * - * @since 3.1.0 - */ -@Name("FIRST") -public class AggregatorFirst extends AbstractIncrementalAggregator -{ - private static final long serialVersionUID = 20154301646L; - - public AggregatorFirst() - { - //Do nothing - } - - @Override - public Aggregate getGroup(InputEvent src, int aggregatorIndex) - { - Aggregate aggregate = super.getGroup(src, aggregatorIndex); - - GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); - - return aggregate; - } - - @Override - public Type getOutputType(Type inputType) - { - return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType); - } - - @Override - public void aggregate(Aggregate dest, InputEvent src) - { - //Ignore - } - - @Override - public void aggregate(Aggregate dest, Aggregate src) - { - //Ignore - } - - @Override - public FieldsDescriptor getMetaDataDescriptor() - { - return null; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java deleted file mode 100644 index 09190e1..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorIncrementalType.java +++ /dev/null @@ -1,79 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import java.util.Collections; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -/** - * @since 3.1.0 - */ - -public enum AggregatorIncrementalType -{ - SUM(new AggregatorSum()), - MIN(new AggregatorMin()), - MAX(new AggregatorMax()), - COUNT(new AggregatorCount()), - LAST(new AggregatorLast()), - FIRST(new AggregatorFirst()), - CUM_SUM(new AggregatorCumSum()); - - public static final Map<String, Integer> NAME_TO_ORDINAL; - public static final Map<String, IncrementalAggregator> NAME_TO_AGGREGATOR; - - private IncrementalAggregator aggregator; - - static { - Map<String, Integer> nameToOrdinal = Maps.newHashMap(); - Map<String, IncrementalAggregator> nameToAggregator = Maps.newHashMap(); - - for (AggregatorIncrementalType aggType : AggregatorIncrementalType.values()) { - nameToOrdinal.put(aggType.name(), aggType.ordinal()); - nameToAggregator.put(aggType.name(), aggType.getAggregator()); - } - - NAME_TO_ORDINAL = Collections.unmodifiableMap(nameToOrdinal); - NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator); - } - - AggregatorIncrementalType(IncrementalAggregator aggregator) - { - setAggregator(aggregator); - } - - private void setAggregator(IncrementalAggregator aggregator) - { - Preconditions.checkNotNull(aggregator); - this.aggregator = aggregator; - } - - public IncrementalAggregator getAggregator() - { - return aggregator; - } - - private static final Logger LOG = LoggerFactory.getLogger(AggregatorIncrementalType.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java deleted file mode 100644 index f727036..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorLast.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import com.datatorrent.api.annotation.Name; -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsEvent; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; - -/** - * <p> - * This aggregator creates an aggregate out of the last {@link InputEvent} encountered by this aggregator. All previous - * {@link InputEvent}s are ignored. - * </p> - * <p> - * <b>Note:</b> when aggregates are combined in a unifier it is not possible to tell which came first or last, so - * one is picked arbitrarily to be the last. - * </p> - * - * @since 3.1.0 - */ -@Name("LAST") -public class AggregatorLast extends AbstractIncrementalAggregator -{ - private static final long serialVersionUID = 20154301647L; - - public AggregatorLast() - { - //Do nothing - } - - @Override - public Aggregate getGroup(InputEvent src, int aggregatorIndex) - { - Aggregate aggregate = super.getGroup(src, aggregatorIndex); - - GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); - - return aggregate; - } - - @Override - public Type getOutputType(Type inputType) - { - return AggregatorUtils.IDENTITY_TYPE_MAP.get(inputType); - } - - @Override - public void aggregate(Aggregate dest, InputEvent src) - { - GPOUtils.indirectCopy(dest.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); - } - - @Override - public void aggregate(Aggregate dest, Aggregate src) - { - DimensionsEvent.copy(dest, src); - } - - @Override - public FieldsDescriptor getMetaDataDescriptor() - { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java deleted file mode 100644 index 25f9db2..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMax.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import com.datatorrent.api.annotation.Name; -import com.datatorrent.lib.appdata.gpo.GPOMutable; -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; - -/** - * This {@link IncrementalAggregator} takes the max of the fields provided in the {@link InputEvent}. - * - * @since 3.1.0 - */ -@Name("MAX") -public class AggregatorMax extends AbstractIncrementalAggregator -{ - private static final long serialVersionUID = 201503120332L; - - public AggregatorMax() - { - //Do nothing - } - - @Override - public Aggregate getGroup(InputEvent src, int aggregatorIndex) - { - Aggregate aggregate = super.getGroup(src, aggregatorIndex); - - GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); - - return aggregate; - } - - @Override - public void aggregate(Aggregate dest, InputEvent src) - { - GPOMutable destAggs = dest.getAggregates(); - GPOMutable srcAggs = src.getAggregates(); - - { - byte[] destByte = destAggs.getFieldsByte(); - if (destByte != null) { - byte[] srcByte = srcAggs.getFieldsByte(); - int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset; - for (int index = 0; - index < destByte.length; - index++) { - byte tempVal = srcByte[srcIndices[index]]; - if (destByte[index] < tempVal) { - destByte[index] = tempVal; - } - } - } - } - - { - short[] destShort = destAggs.getFieldsShort(); - if (destShort != null) { - short[] srcShort = srcAggs.getFieldsShort(); - int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset; - for (int index = 0; - index < destShort.length; - index++) { - short tempVal = srcShort[srcIndices[index]]; - if (destShort[index] < tempVal) { - destShort[index] = tempVal; - } - } - } - } - - { - int[] destInteger = destAggs.getFieldsInteger(); - if (destInteger != null) { - int[] srcInteger = srcAggs.getFieldsInteger(); - int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset; - for (int index = 0; - index < destInteger.length; - index++) { - int tempVal = srcInteger[srcIndices[index]]; - if (destInteger[index] < tempVal) { - destInteger[index] = tempVal; - } - } - } - } - - { - long[] destLong = destAggs.getFieldsLong(); - if (destLong != null) { - long[] srcLong = srcAggs.getFieldsLong(); - int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset; - for (int index = 0; - index < destLong.length; - index++) { - long tempVal = srcLong[srcIndices[index]]; - if (destLong[index] < tempVal) { - destLong[index] = tempVal; - } - } - } - } - - { - float[] destFloat = destAggs.getFieldsFloat(); - if (destFloat != null) { - float[] srcFloat = srcAggs.getFieldsFloat(); - int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset; - for (int index = 0; - index < destFloat.length; - index++) { - float tempVal = srcFloat[srcIndices[index]]; - if (destFloat[index] < tempVal) { - destFloat[index] = tempVal; - } - } - } - } - - { - double[] destDouble = destAggs.getFieldsDouble(); - if (destDouble != null) { - double[] srcDouble = srcAggs.getFieldsDouble(); - int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset; - for (int index = 0; - index < destDouble.length; - index++) { - double tempVal = srcDouble[srcIndices[index]]; - if (destDouble[index] < tempVal) { - destDouble[index] = tempVal; - } - } - } - } - } - - @Override - public void aggregate(Aggregate dest, Aggregate src) - { - GPOMutable destAggs = dest.getAggregates(); - GPOMutable srcAggs = src.getAggregates(); - - { - byte[] destByte = destAggs.getFieldsByte(); - if (destByte != null) { - byte[] srcByte = srcAggs.getFieldsByte(); - - for (int index = 0; - index < destByte.length; - index++) { - if (destByte[index] < srcByte[index]) { - destByte[index] = srcByte[index]; - } - } - } - } - - { - short[] destShort = destAggs.getFieldsShort(); - if (destShort != null) { - short[] srcShort = srcAggs.getFieldsShort(); - - for (int index = 0; - index < destShort.length; - index++) { - if (destShort[index] < srcShort[index]) { - destShort[index] = srcShort[index]; - } - } - } - } - - { - int[] destInteger = destAggs.getFieldsInteger(); - if (destInteger != null) { - int[] srcInteger = srcAggs.getFieldsInteger(); - - for (int index = 0; - index < destInteger.length; - index++) { - if (destInteger[index] < srcInteger[index]) { - destInteger[index] = srcInteger[index]; - } - } - } - } - - { - long[] destLong = destAggs.getFieldsLong(); - if (destLong != null) { - long[] srcLong = srcAggs.getFieldsLong(); - - for (int index = 0; - index < destLong.length; - index++) { - if (destLong[index] < srcLong[index]) { - destLong[index] = srcLong[index]; - } - } - } - } - - { - float[] destFloat = destAggs.getFieldsFloat(); - if (destFloat != null) { - float[] srcFloat = srcAggs.getFieldsFloat(); - - for (int index = 0; - index < destFloat.length; - index++) { - if (destFloat[index] < srcFloat[index]) { - destFloat[index] = srcFloat[index]; - } - } - } - } - - { - double[] destDouble = destAggs.getFieldsDouble(); - if (destDouble != null) { - double[] srcDouble = srcAggs.getFieldsDouble(); - - for (int index = 0; - index < destDouble.length; - index++) { - if (destDouble[index] < srcDouble[index]) { - destDouble[index] = srcDouble[index]; - } - } - } - } - } - - @Override - public Type getOutputType(Type inputType) - { - return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType); - } - - @Override - public FieldsDescriptor getMetaDataDescriptor() - { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java deleted file mode 100644 index b377e9b..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorMin.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import com.datatorrent.api.annotation.Name; -import com.datatorrent.lib.appdata.gpo.GPOMutable; -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; - -/** - * This {@link IncrementalAggregator} takes the min of the fields provided in the {@link InputEvent}. - * - * @since 3.1.0 - */ -@Name("MIN") -public class AggregatorMin extends AbstractIncrementalAggregator -{ - private static final long serialVersionUID = 20154301648L; - - public AggregatorMin() - { - //Do nothing - } - - @Override - public Aggregate getGroup(InputEvent src, int aggregatorIndex) - { - Aggregate aggregate = super.getGroup(src, aggregatorIndex); - - GPOUtils.indirectCopy(aggregate.getAggregates(), src.getAggregates(), context.indexSubsetAggregates); - - return aggregate; - } - - @Override - public void aggregate(Aggregate dest, InputEvent src) - { - GPOMutable destAggs = dest.getAggregates(); - GPOMutable srcAggs = src.getAggregates(); - - { - byte[] destByte = destAggs.getFieldsByte(); - if (destByte != null) { - byte[] srcByte = srcAggs.getFieldsByte(); - int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset; - for (int index = 0; - index < destByte.length; - index++) { - byte tempVal = srcByte[srcIndices[index]]; - if (destByte[index] > tempVal) { - destByte[index] = tempVal; - } - } - } - } - - { - short[] destShort = destAggs.getFieldsShort(); - if (destShort != null) { - short[] srcShort = srcAggs.getFieldsShort(); - int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset; - for (int index = 0; - index < destShort.length; - index++) { - short tempVal = srcShort[srcIndices[index]]; - if (destShort[index] > tempVal) { - destShort[index] = tempVal; - } - } - } - } - - { - int[] destInteger = destAggs.getFieldsInteger(); - if (destInteger != null) { - int[] srcInteger = srcAggs.getFieldsInteger(); - int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset; - for (int index = 0; - index < destInteger.length; - index++) { - int tempVal = srcInteger[srcIndices[index]]; - if (destInteger[index] > tempVal) { - destInteger[index] = tempVal; - } - } - } - } - - { - long[] destLong = destAggs.getFieldsLong(); - if (destLong != null) { - long[] srcLong = srcAggs.getFieldsLong(); - int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset; - for (int index = 0; - index < destLong.length; - index++) { - long tempVal = srcLong[srcIndices[index]]; - if (destLong[index] > tempVal) { - destLong[index] = tempVal; - } - } - } - } - - { - float[] destFloat = destAggs.getFieldsFloat(); - if (destFloat != null) { - float[] srcFloat = srcAggs.getFieldsFloat(); - int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset; - for (int index = 0; - index < destFloat.length; - index++) { - float tempVal = srcFloat[srcIndices[index]]; - if (destFloat[index] > tempVal) { - destFloat[index] = tempVal; - } - } - } - } - - { - double[] destDouble = destAggs.getFieldsDouble(); - if (destDouble != null) { - double[] srcDouble = srcAggs.getFieldsDouble(); - int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset; - for (int index = 0; - index < destDouble.length; - index++) { - double tempVal = srcDouble[srcIndices[index]]; - if (destDouble[index] > tempVal) { - destDouble[index] = tempVal; - } - } - } - } - } - - @Override - public void aggregate(Aggregate dest, Aggregate src) - { - GPOMutable destAggs = dest.getAggregates(); - GPOMutable srcAggs = src.getAggregates(); - - { - byte[] destByte = destAggs.getFieldsByte(); - if (destByte != null) { - byte[] srcByte = srcAggs.getFieldsByte(); - - for (int index = 0; - index < destByte.length; - index++) { - if (destByte[index] > srcByte[index]) { - destByte[index] = srcByte[index]; - } - } - } - } - - { - short[] destShort = destAggs.getFieldsShort(); - if (destShort != null) { - short[] srcShort = srcAggs.getFieldsShort(); - - for (int index = 0; - index < destShort.length; - index++) { - if (destShort[index] > srcShort[index]) { - destShort[index] = srcShort[index]; - } - } - } - } - - { - int[] destInteger = destAggs.getFieldsInteger(); - if (destInteger != null) { - int[] srcInteger = srcAggs.getFieldsInteger(); - - for (int index = 0; - index < destInteger.length; - index++) { - if (destInteger[index] > srcInteger[index]) { - destInteger[index] = srcInteger[index]; - } - } - } - } - - { - long[] destLong = destAggs.getFieldsLong(); - if (destLong != null) { - long[] srcLong = srcAggs.getFieldsLong(); - - for (int index = 0; - index < destLong.length; - index++) { - if (destLong[index] > srcLong[index]) { - destLong[index] = srcLong[index]; - } - } - } - } - - { - float[] destFloat = destAggs.getFieldsFloat(); - if (destFloat != null) { - float[] srcFloat = srcAggs.getFieldsFloat(); - - for (int index = 0; - index < destFloat.length; - index++) { - if (destFloat[index] > srcFloat[index]) { - destFloat[index] = srcFloat[index]; - } - } - } - } - - { - double[] destDouble = destAggs.getFieldsDouble(); - if (destDouble != null) { - double[] srcDouble = srcAggs.getFieldsDouble(); - - for (int index = 0; - index < destDouble.length; - index++) { - if (destDouble[index] > srcDouble[index]) { - destDouble[index] = srcDouble[index]; - } - } - } - } - } - - @Override - public Type getOutputType(Type inputType) - { - return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType); - } - - @Override - public FieldsDescriptor getMetaDataDescriptor() - { - return null; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java deleted file mode 100644 index fd711cb..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorOTFType.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import java.util.Collections; -import java.util.Map; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - -/** - * This is a convenience enum to store all the information about default {@link OTFAggregator}s - * in one place. - * - * @since 3.1.0 - */ -public enum AggregatorOTFType -{ - /** - * The average {@link OTFAggregator}. - */ - AVG(AggregatorAverage.INSTANCE); - - /** - * A map from {@link OTFAggregator} names to {@link OTFAggregator}s. - */ - public static final Map<String, OTFAggregator> NAME_TO_AGGREGATOR; - - static { - Map<String, OTFAggregator> nameToAggregator = Maps.newHashMap(); - - for (AggregatorOTFType aggType : AggregatorOTFType.values()) { - nameToAggregator.put(aggType.name(), aggType.getAggregator()); - } - - NAME_TO_AGGREGATOR = Collections.unmodifiableMap(nameToAggregator); - } - - /** - * The {@link OTFAggregator} assigned to this enum. - */ - private OTFAggregator aggregator; - - /** - * Creates an {@link OTFAggregator} enum with the given aggregator. - * - * @param aggregator The {@link OTFAggregator} assigned to this enum. - */ - AggregatorOTFType(OTFAggregator aggregator) - { - setAggregator(aggregator); - } - - /** - * Sets the {@link OTFAggregator} assigned to this enum. - * - * @param aggregator The {@link OTFAggregator} assigned to this enum. - */ - private void setAggregator(OTFAggregator aggregator) - { - this.aggregator = Preconditions.checkNotNull(aggregator); - } - - /** - * Gets the {@link OTFAggregator} assigned to this enum. - * - * @return The {@link OTFAggregator} assigned to this enum. - */ - public OTFAggregator getAggregator() - { - return aggregator; - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java deleted file mode 100644 index ff5a75d..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorRegistry.java +++ /dev/null @@ -1,424 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import java.io.Serializable; -import java.util.List; -import java.util.Map; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * <p> - * This registry is used by generic dimensions computation operators and dimension stores in order to support - * plugging different - * aggregators into the operator. Subclasses of - * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema} use this registry - * to support pluggable aggregators when doing dimensions computation, and Subclasses of - * AppDataSingleSchemaDimensionStoreHDHT use this class as well. - * </p> - * <p> - * The primary purpose of an {@link AggregatorRegistry} is to provide a mapping from aggregator names to aggregators, - * and to provide mappings from aggregator IDs to aggregators. These mappings are necessary in order to correctly - * process schemas, App Data queries, and store aggregated data. - * </p> - * - * @since 3.1.0 - */ -public class AggregatorRegistry implements Serializable -{ - private static final long serialVersionUID = 20154301642L; - - /** - * This is a map from {@link IncrementalAggregator} names to {@link IncrementalAggregator}s used by the - * default {@link AggregatorRegistry}. - */ - private static final transient Map<String, IncrementalAggregator> DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR; - /** - * This is a map from {@link OTFAggregator} names to {@link OTFAggregator}s used by the default - * {@link AggregatorRegistry}. - */ - private static final transient Map<String, OTFAggregator> DEFAULT_NAME_TO_OTF_AGGREGATOR; - - //Build the default maps - static { - DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR = Maps.newHashMap(AggregatorIncrementalType.NAME_TO_AGGREGATOR); - DEFAULT_NAME_TO_OTF_AGGREGATOR = Maps.newHashMap(AggregatorOTFType.NAME_TO_AGGREGATOR); - } - - /** - * This is a default aggregator registry that can be used in operators. - */ - public static final AggregatorRegistry DEFAULT_AGGREGATOR_REGISTRY = new AggregatorRegistry( - DEFAULT_NAME_TO_INCREMENTAL_AGGREGATOR, DEFAULT_NAME_TO_OTF_AGGREGATOR, - AggregatorIncrementalType.NAME_TO_ORDINAL); - - /** - * This is a flag indicating whether or not this {@link AggregatorRegistry} has been setup before or not. - */ - private transient boolean setup = false; - /** - * This is a map from the class of an {@link IncrementalAggregator} to the name of that - * {@link IncrementalAggregator}. - */ - private transient Map<Class<? extends IncrementalAggregator>, String> classToIncrementalAggregatorName; - /** - * This is a map from the name of an {@link OTFAggregator} to the list of the names of all - * {@link IncrementalAggregator} that are child aggregators of that {@link OTFAggregator}. - */ - private transient Map<String, List<String>> otfAggregatorToIncrementalAggregators; - /** - * This is a map from the aggregator ID of an - * {@link IncrementalAggregator} to the corresponding {@link IncrementalAggregator}. - */ - private transient Map<Integer, IncrementalAggregator> incrementalAggregatorIDToAggregator; - /** - * This is a map from the name assigned to an {@link IncrementalAggregator} to the {@link IncrementalAggregator}. - */ - private Map<String, IncrementalAggregator> nameToIncrementalAggregator; - /** - * This is a map from the name assigned to an {@link OTFAggregator} to the {@link OTFAggregator}. - */ - private Map<String, OTFAggregator> nameToOTFAggregator; - /** - * This is a map from the name of an {@link IncrementalAggregator} to the ID of that {@link IncrementalAggregator}. - */ - private Map<String, Integer> incrementalAggregatorNameToID; - - /** - * This is a helper method used to autogenerate the IDs for each {@link IncrementalAggregator} - * - * @param nameToAggregator A mapping from the name of an {@link IncrementalAggregator} to the - * {@link IncrementalAggregator}. - * @return A mapping from the name of an {@link IncrementalAggregator} to the ID assigned to that - * {@link IncrementalAggregator}. - */ - private static Map<String, Integer> autoGenIds(Map<String, IncrementalAggregator> nameToAggregator) - { - Map<String, Integer> staticAggregatorNameToID = Maps.newHashMap(); - - for (Map.Entry<String, IncrementalAggregator> entry : nameToAggregator.entrySet()) { - staticAggregatorNameToID.put(entry.getKey(), stringHash(entry.getValue().getClass().getName())); - } - - return staticAggregatorNameToID; - } - - /** - * This is a helper method for computing the hash of the string. This is intended to be a static unchanging - * method since the computed hash is used for aggregator IDs which are used for persistence. - * <p> - * <b>Note:</b> Do not change this function it will cause corruption for users updating existing data stores. - * </p> - * - * @return The hash of the given string. - */ - private static int stringHash(String string) - { - int hash = 5381; - - for (int index = 0; - index < string.length(); - index++) { - int character = (int)string.charAt(index); - hash = hash * 33 + character; - } - - return hash; - } - - /** - * This constructor is present for Kryo serialization - */ - private AggregatorRegistry() - { - //for kryo - } - - /** - * <p> - * This creates an {@link AggregatorRegistry} which assigns the given names to the given - * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor also auto-generates - * the IDs associated with each {@link IncrementalAggregator} by computing the hashcode of the - * fully qualified class name of each {@link IncrementalAggregator}. - * </p> - * <p> - * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the - * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored. - * </p> - * - * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator}, - * where the string is the name of an - * {@link IncrementalAggregator} and the value is the {@link IncrementalAggregator} - * with that name. - * @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the string - * is the name of - * an {@link OTFAggregator} and the value is the {@link OTFAggregator} with that - * name. - */ - public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator, - Map<String, OTFAggregator> nameToOTFAggregator) - { - this(nameToIncrementalAggregator, - nameToOTFAggregator, - autoGenIds(nameToIncrementalAggregator)); - } - - /** - * <p> - * This creates an {@link AggregatorRegistry} which assigns the given names to the given - * {@link IncrementalAggregator}s and {@link OTFAggregator}s. This constructor assigns IDs to each - * {@link IncrementalAggregator} by using the provided map from incremental aggregator names to IDs. - * </p> - * <p> - * <b>Note:</b> IDs only need to be generated for {@link IncrementalAggregator}s since they are the - * only type of stored aggregations. {@link OTFAggregator}s do not require an ID since they are not stored. - * </p> - * - * @param nameToIncrementalAggregator This is a map from {@link String} to {@link IncrementalAggregator}, - * where the string is the name of an - * {@link IncrementalAggregator} and the value is the - * {@link IncrementalAggregator} - * with that name. - * @param nameToOTFAggregator This is a map from {@link String} to {@link OTFAggregator}, where the - * string is the name of - * an {@link OTFAggregator} and the value is the {@link OTFAggregator} with - * that name. - * @param incrementalAggregatorNameToID This is a map from the name of an {@link IncrementalAggregator} to the ID - * for that - * {@link IncrementalAggregator}. - */ - public AggregatorRegistry(Map<String, IncrementalAggregator> nameToIncrementalAggregator, - Map<String, OTFAggregator> nameToOTFAggregator, - Map<String, Integer> incrementalAggregatorNameToID) - { - setNameToIncrementalAggregator(nameToIncrementalAggregator); - setNameToOTFAggregator(nameToOTFAggregator); - - setIncrementalAggregatorNameToID(incrementalAggregatorNameToID); - - validate(); - } - - /** - * This is a helper method which is used to do validation on the maps provided to the constructor of this class. - */ - private void validate() - { - for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) { - Preconditions.checkNotNull(entry.getKey()); - Preconditions.checkNotNull(entry.getValue()); - } - - for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) { - Preconditions.checkNotNull(entry.getKey()); - Preconditions.checkNotNull(entry.getValue()); - } - - for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) { - Preconditions.checkNotNull(entry.getKey()); - Preconditions.checkNotNull(entry.getValue()); - } - } - - /** - * This method is called to initialize various internal datastructures of the {@link AggregatorRegistry}. - * This method should be called before the {@link AggregatorRegistry} is used. - */ - @SuppressWarnings({"unchecked", "rawtypes"}) - public void setup() - { - if (setup) { - //If the AggregatorRegistry was already setup. Don't set it up again. - return; - } - - setup = true; - - classToIncrementalAggregatorName = Maps.newHashMap(); - - for (Map.Entry<String, IncrementalAggregator> entry : nameToIncrementalAggregator.entrySet()) { - classToIncrementalAggregatorName.put((Class)entry.getValue().getClass(), entry.getKey()); - } - - incrementalAggregatorIDToAggregator = Maps.newHashMap(); - - for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) { - String aggregatorName = entry.getKey(); - int aggregatorID = entry.getValue(); - incrementalAggregatorIDToAggregator.put(aggregatorID, - nameToIncrementalAggregator.get(aggregatorName)); - } - - otfAggregatorToIncrementalAggregators = Maps.newHashMap(); - - for (Map.Entry<String, OTFAggregator> entry : nameToOTFAggregator.entrySet()) { - String name = entry.getKey(); - List<String> staticAggregators = Lists.newArrayList(); - - OTFAggregator dotfAggregator = nameToOTFAggregator.get(name); - - for (Class clazz : dotfAggregator.getChildAggregators()) { - staticAggregators.add(classToIncrementalAggregatorName.get(clazz)); - } - - otfAggregatorToIncrementalAggregators.put(name, staticAggregators); - } - } - - /** - * This is a helper method which sets and validated the given mapping from an {@link IncrementalAggregator}'s name - * to an {@link IncrementalAggregator}. - * - * @param nameToIncrementalAggregator The mapping from an {@link IncrementalAggregator}'s name to an - * {@link IncrementalAggregator}. - */ - private void setNameToIncrementalAggregator(Map<String, IncrementalAggregator> nameToIncrementalAggregator) - { - this.nameToIncrementalAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToIncrementalAggregator)); - } - - /** - * This is a helper method which sets and validates the given mapping from an {@link OTFAggregator}'s name to - * an {@link OTFAggregator}. - * - * @param nameToOTFAggregator The mapping from an {@link OTFAggregator}'s name to an {@link OTFAggregator}. - */ - private void setNameToOTFAggregator(Map<String, OTFAggregator> nameToOTFAggregator) - { - this.nameToOTFAggregator = Maps.newHashMap(Preconditions.checkNotNull(nameToOTFAggregator)); - } - - /** - * Checks if the given aggregatorName is the name of an {@link IncrementalAggregator} or {@link OTFAggregator} - * registered to this registry. - * - * @param aggregatorName The aggregator name to check. - * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered to - * this registry. False otherwise. - */ - public boolean isAggregator(String aggregatorName) - { - return classToIncrementalAggregatorName.values().contains(aggregatorName) || - nameToOTFAggregator.containsKey(aggregatorName); - } - - /** - * Checks if the given aggregator name is the name of an {@link IncrementalAggregator} registered - * to this registry. - * - * @param aggregatorName The aggregator name to check. - * @return True if the given aggregator name is the name of an {@link IncrementalAggregator} registered - * to this registry. False otherwise. - */ - public boolean isIncrementalAggregator(String aggregatorName) - { - return classToIncrementalAggregatorName.values().contains(aggregatorName); - } - - /** - * Gets the mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}. - * - * @return The mapping from an {@link IncrementalAggregator}'s class to the {@link IncrementalAggregator}. - */ - public Map<Class<? extends IncrementalAggregator>, String> getClassToIncrementalAggregatorName() - { - return classToIncrementalAggregatorName; - } - - /** - * Gets the mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}. - * - * @return The mapping from an {@link IncrementalAggregator}'s ID to the {@link IncrementalAggregator}. - */ - public Map<Integer, IncrementalAggregator> getIncrementalAggregatorIDToAggregator() - { - return incrementalAggregatorIDToAggregator; - } - - /** - * This a helper method which sets and validates the mapping from {@link IncrementalAggregator} name to - * {@link IncrementalAggregator} ID. - * - * @param incrementalAggregatorNameToID The mapping from {@link IncrementalAggregator} name to - * {@link IncrementalAggregator} ID. - */ - private void setIncrementalAggregatorNameToID(Map<String, Integer> incrementalAggregatorNameToID) - { - Preconditions.checkNotNull(incrementalAggregatorNameToID); - - for (Map.Entry<String, Integer> entry : incrementalAggregatorNameToID.entrySet()) { - Preconditions.checkNotNull(entry.getKey()); - Preconditions.checkNotNull(entry.getValue()); - } - - this.incrementalAggregatorNameToID = Maps.newHashMap(incrementalAggregatorNameToID); - } - - /** - * This returns a map from the names of an {@link IncrementalAggregator}s to the corresponding ID of the - * {@link IncrementalAggregator}. - * - * @return Returns a map from the names of an {@link IncrementalAggregator} to the corresponding ID of the - * {@link IncrementalAggregator}. - */ - public Map<String, Integer> getIncrementalAggregatorNameToID() - { - return incrementalAggregatorNameToID; - } - - /** - * Returns the name to {@link OTFAggregator} mapping, where the key is the name of the {@link OTFAggregator}. - * - * @return The name to {@link OTFAggregator} mapping. - */ - public Map<String, OTFAggregator> getNameToOTFAggregators() - { - return nameToOTFAggregator; - } - - /** - * Returns the mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of - * that {@link OTFAggregator}. - * - * @return The mapping from {@link OTFAggregator} names to a list of names of all the child aggregators of - * that {@link OTFAggregator}. - */ - public Map<String, List<String>> getOTFAggregatorToIncrementalAggregators() - { - return otfAggregatorToIncrementalAggregators; - } - - /** - * Returns the name to {@link IncrementalAggregator} mapping, where the key is the name of the {@link OTFAggregator}. - * - * @return The name to {@link IncrementalAggregator} mapping. - */ - public Map<String, IncrementalAggregator> getNameToIncrementalAggregator() - { - return nameToIncrementalAggregator; - } - - private static final Logger lOG = LoggerFactory.getLogger(AggregatorRegistry.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java deleted file mode 100644 index c68744b..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorSum.java +++ /dev/null @@ -1,254 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.datatorrent.api.annotation.Name; -import com.datatorrent.lib.appdata.gpo.GPOMutable; -import com.datatorrent.lib.appdata.gpo.GPOUtils; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; - -/** - * This {@link IncrementalAggregator} performs a sum operation over the fields in the given {@link InputEvent}. - * - * @since 3.1.0 - */ -@Name("SUM") -public class AggregatorSum extends AbstractIncrementalAggregator -{ - private static final long serialVersionUID = 20154301649L; - - public AggregatorSum() - { - //Do nothing - } - - @Override - public Aggregate getGroup(InputEvent src, int aggregatorIndex) - { - src.used = true; - Aggregate aggregate = createAggregate(src, - context, - aggregatorIndex); - - GPOMutable value = aggregate.getAggregates(); - GPOUtils.zeroFillNumeric(value); - - return aggregate; - } - - @Override - public void aggregate(Aggregate dest, Aggregate src) - { - GPOMutable destAggs = dest.getAggregates(); - GPOMutable srcAggs = src.getAggregates(); - - aggregateAggs(destAggs, srcAggs); - } - - public void aggregateAggs(GPOMutable destAggs, GPOMutable srcAggs) - { - { - byte[] destByte = destAggs.getFieldsByte(); - if (destByte != null) { - byte[] srcByte = srcAggs.getFieldsByte(); - - for (int index = 0; - index < destByte.length; - index++) { - destByte[index] += srcByte[index]; - } - } - } - - { - short[] destShort = destAggs.getFieldsShort(); - if (destShort != null) { - short[] srcShort = srcAggs.getFieldsShort(); - - for (int index = 0; - index < destShort.length; - index++) { - destShort[index] += srcShort[index]; - } - } - } - - { - int[] destInteger = destAggs.getFieldsInteger(); - if (destInteger != null) { - int[] srcInteger = srcAggs.getFieldsInteger(); - - for (int index = 0; - index < destInteger.length; - index++) { - destInteger[index] += srcInteger[index]; - } - } - } - - { - long[] destLong = destAggs.getFieldsLong(); - if (destLong != null) { - long[] srcLong = srcAggs.getFieldsLong(); - - for (int index = 0; - index < destLong.length; - index++) { - destLong[index] += srcLong[index]; - } - } - } - - { - float[] destFloat = destAggs.getFieldsFloat(); - if (destFloat != null) { - float[] srcFloat = srcAggs.getFieldsFloat(); - - for (int index = 0; - index < destFloat.length; - index++) { - destFloat[index] += srcFloat[index]; - } - } - } - - { - double[] destDouble = destAggs.getFieldsDouble(); - if (destDouble != null) { - double[] srcDouble = srcAggs.getFieldsDouble(); - - for (int index = 0; - index < destDouble.length; - index++) { - destDouble[index] += srcDouble[index]; - } - } - } - } - - @Override - public void aggregate(Aggregate dest, InputEvent src) - { - GPOMutable destAggs = dest.getAggregates(); - GPOMutable srcAggs = src.getAggregates(); - - aggregateInput(destAggs, srcAggs); - } - - public void aggregateInput(GPOMutable destAggs, GPOMutable srcAggs) - { - { - byte[] destByte = destAggs.getFieldsByte(); - if (destByte != null) { - byte[] srcByte = srcAggs.getFieldsByte(); - int[] srcIndices = context.indexSubsetAggregates.fieldsByteIndexSubset; - for (int index = 0; - index < destByte.length; - index++) { - destByte[index] += srcByte[srcIndices[index]]; - } - } - } - - { - short[] destShort = destAggs.getFieldsShort(); - if (destShort != null) { - short[] srcShort = srcAggs.getFieldsShort(); - int[] srcIndices = context.indexSubsetAggregates.fieldsShortIndexSubset; - for (int index = 0; - index < destShort.length; - index++) { - destShort[index] += srcShort[srcIndices[index]]; - } - } - } - - { - int[] destInteger = destAggs.getFieldsInteger(); - if (destInteger != null) { - int[] srcInteger = srcAggs.getFieldsInteger(); - int[] srcIndices = context.indexSubsetAggregates.fieldsIntegerIndexSubset; - for (int index = 0; - index < destInteger.length; - index++) { - destInteger[index] += srcInteger[srcIndices[index]]; - } - } - } - - { - long[] destLong = destAggs.getFieldsLong(); - if (destLong != null) { - long[] srcLong = srcAggs.getFieldsLong(); - int[] srcIndices = context.indexSubsetAggregates.fieldsLongIndexSubset; - for (int index = 0; - index < destLong.length; - index++) { - destLong[index] += srcLong[srcIndices[index]]; - } - } - } - - { - float[] destFloat = destAggs.getFieldsFloat(); - if (destFloat != null) { - float[] srcFloat = srcAggs.getFieldsFloat(); - int[] srcIndices = context.indexSubsetAggregates.fieldsFloatIndexSubset; - for (int index = 0; - index < destFloat.length; - index++) { - destFloat[index] += srcFloat[srcIndices[index]]; - } - } - } - - { - double[] destDouble = destAggs.getFieldsDouble(); - if (destDouble != null) { - double[] srcDouble = srcAggs.getFieldsDouble(); - int[] srcIndices = context.indexSubsetAggregates.fieldsDoubleIndexSubset; - for (int index = 0; - index < destDouble.length; - index++) { - destDouble[index] += srcDouble[srcIndices[index]]; - } - } - } - } - - @Override - public Type getOutputType(Type inputType) - { - return AggregatorUtils.IDENTITY_NUMBER_TYPE_MAP.get(inputType); - } - - @Override - public FieldsDescriptor getMetaDataDescriptor() - { - return null; - } - - private static final Logger LOG = LoggerFactory.getLogger(AggregatorSum.class); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java deleted file mode 100644 index 9643310..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/AggregatorUtils.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import java.util.Collections; -import java.util.Map; - -import com.google.common.collect.Maps; - -import com.datatorrent.lib.appdata.schemas.Fields; -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; - -/** - * This class contains utility methods which are useful for aggregators. - * - * @since 3.1.0 - */ -public final class AggregatorUtils -{ - /** - * This is an identity type map, which maps input types to the same output types. - */ - public static final transient Map<Type, Type> IDENTITY_TYPE_MAP; - /** - * This is an identity type map, for numeric types only. This is - * helpful when creating aggregators like {@link AggregatorSum}, where the sum of ints is an - * int and the sum of floats is a float. - */ - public static final transient Map<Type, Type> IDENTITY_NUMBER_TYPE_MAP; - - static { - Map<Type, Type> identityTypeMap = Maps.newHashMap(); - - for (Type type : Type.values()) { - identityTypeMap.put(type, type); - } - - IDENTITY_TYPE_MAP = Collections.unmodifiableMap(identityTypeMap); - - Map<Type, Type> identityNumberTypeMap = Maps.newHashMap(); - - for (Type type : Type.NUMERIC_TYPES) { - identityNumberTypeMap.put(type, type); - } - - IDENTITY_NUMBER_TYPE_MAP = Collections.unmodifiableMap(identityNumberTypeMap); - } - - /** - * Don't instantiate this class. - */ - private AggregatorUtils() - { - //Don't instantiate this class. - } - - /** - * This is a helper method which takes a {@link FieldsDescriptor} object, which defines the types of the fields - * that the {@link IncrementalAggregator} receives as input. It then uses the given {@link IncrementalAggregator} - * and {@link FieldsDescriptor} object to compute the {@link FieldsDescriptor} object for the aggregation produced - * byte the given - * {@link IncrementalAggregator} when it receives an input corresponding to the given input {@link FieldsDescriptor}. - * - * @param inputFieldsDescriptor This is a {@link FieldsDescriptor} object which defines the names and types of input - * data recieved by an aggregator. - * @param incrementalAggregator This is the - * {@link IncrementalAggregator} for which an output {@link FieldsDescriptor} needs - * to be computed. - * @return The output {@link FieldsDescriptor} for this aggregator when it receives input data with the same schema as - * the specified input {@link FieldsDescriptor}. - */ - public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor, - IncrementalAggregator incrementalAggregator) - { - Map<String, Type> fieldToType = Maps.newHashMap(); - - for (Map.Entry<String, Type> entry : - inputFieldsDescriptor.getFieldToType().entrySet()) { - String fieldName = entry.getKey(); - Type fieldType = entry.getValue(); - Type outputType = incrementalAggregator.getOutputType(fieldType); - fieldToType.put(fieldName, outputType); - } - - return new FieldsDescriptor(fieldToType); - } - - /** - * This is a utility method which creates an output {@link FieldsDescriptor} using the field names - * from the given {@link FieldsDescriptor} and the output type of the given {@link OTFAggregator}. - * - * @param inputFieldsDescriptor The {@link FieldsDescriptor} from which to derive the field names used - * for the output fields descriptor. - * @param otfAggregator The {@link OTFAggregator} to use for creating the output {@link FieldsDescriptor}. - * @return The output {@link FieldsDescriptor}. - */ - public static FieldsDescriptor getOutputFieldsDescriptor(FieldsDescriptor inputFieldsDescriptor, - OTFAggregator otfAggregator) - { - Map<String, Type> fieldToType = Maps.newHashMap(); - - for (Map.Entry<String, Type> entry : - inputFieldsDescriptor.getFieldToType().entrySet()) { - String fieldName = entry.getKey(); - Type outputType = otfAggregator.getOutputType(); - fieldToType.put(fieldName, outputType); - } - - return new FieldsDescriptor(fieldToType); - } - - /** - * This is a utility method which creates an output {@link FieldsDescriptor} from the - * given field names and the given {@link OTFAggregator}. - * - * @param fields The names of the fields to be included in the output {@link FieldsDescriptor}. - * @param otfAggregator The {@link OTFAggregator} to use when creating the output {@link FieldsDescriptor}. - * @return The output {@link FieldsDescriptor}. - */ - public static FieldsDescriptor getOutputFieldsDescriptor(Fields fields, - OTFAggregator otfAggregator) - { - Map<String, Type> fieldToType = Maps.newHashMap(); - - for (String field : fields.getFields()) { - fieldToType.put(field, otfAggregator.getOutputType()); - } - - return new FieldsDescriptor(fieldToType); - } -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java deleted file mode 100644 index 2825e0a..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/IncrementalAggregator.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; -import com.datatorrent.lib.appdata.schemas.Type; -import com.datatorrent.lib.dimensions.DimensionsConversionContext; -import com.datatorrent.lib.dimensions.DimensionsEvent.Aggregate; -import com.datatorrent.lib.dimensions.DimensionsEvent.InputEvent; -import com.datatorrent.lib.dimensions.aggregator.AggregateEvent.Aggregator; - -/** - * <p> - * {@link IncrementalAggregator}s perform aggregations in place, on a field by field basis. For example if we have a - * field cost, an incremental aggregator would take a new value of cost and aggregate it to an aggregate value for - * cost. No fields except the cost field are used in the computation of the cost aggregation in the case of an - * {@link IncrementalAggregator}. - * </p> - * <p> - * {@link IncrementalAggregator}s are intended to be used with subclasses of - * {@link com.datatorrent.lib.dimensions.AbstractDimensionsComputationFlexibleSingleSchema}. The way in which - * {@link IncrementalAggregator}s are used in this context is that a batch of fields to be aggregated by the aggregator - * are provided in the form of an {@link InputEvent}. For example, if there are two fields (cost and revenue), which - * will be aggregated by a sum aggregator, both of those fields will be included in the {@link InputEvent} passed to - * the sum aggregator. And the {DimensionsEventregate} event produced by the sum aggregator will contain two fields, - * one for cost and one for revenue. - * </p> - * - */ -public interface IncrementalAggregator extends Aggregator<InputEvent, Aggregate> -{ - /** - * This method defines the type mapping for the {@link IncrementalAggregator}. The type mapping defines the - * relationship between the type of an input field and the type of its aggregate. For example if the aggregator takes - * a field of type int and produces an aggregate of type float, then this method would return a type of float when - * the given input type is an int. - * @param inputType The type of a field to be aggregate. - * @return The type of the aggregate corresponding to an input field of the given type. - */ - public Type getOutputType(Type inputType); - - /** - * This sets - */ - public void setDimensionsConversionContext(DimensionsConversionContext context); - - /** - * Returns a {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations. - * This method returns null if this aggregator stores no metadata. - * @return A {@link FieldsDescriptor} object which describes the meta data that is stored along with aggregations. - * This method returns null if this aggregator stores no metadata. - */ - public FieldsDescriptor getMetaDataDescriptor(); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java b/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java deleted file mode 100644 index e5d8638..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/aggregator/OTFAggregator.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package com.datatorrent.lib.dimensions.aggregator; - -import java.io.Serializable; - -import java.util.List; - -import com.datatorrent.lib.appdata.gpo.GPOMutable; -import com.datatorrent.lib.appdata.schemas.Type; - -/** - * <p> - * This interface represents an On The Fly Aggregator. On the fly aggregators represent a class - * of aggregations which use the results of incremental aggregators, which implement the - * {@link com.datatorrent.lib.dimensions.aggregator.IncrementalAggregator} interface. An example of an aggregation which - * needs to be performed on the fly is average. Average needs to be performed on the fly because average cannot be - * computed with just an existing average and a new data item, an average required the sum of all data items, and the - * count of all data items. An example implementation of average is {@link AggregatorAverage}. Also note - * that unlike {@link IncrementalAggregator}s an {@link OTFAggregator} only has one output type. This done - * because {@link OTFAggregator}s usually represent a very specific computation, with a specific output type. - * For example, average is a computation that you will almost always want to produce a double. But if you require - * an average operation that produces an integer, that could be done as a separate {@link OTFAggregator}. - * </p> - * <p> - * The primary usage for {@link OTFAggregator}s are in store operators which respond to queries. Currently, - * the only places which utilize {@link OTFAggregator}s are subclasses of the DimensionsStoreHDHT operator. - * </p> - * <p> - * This interface extends {@link Serializable} because On The Fly aggregators may be set - * as properties on some operators and operator properties are required to be java serializable. - * </p> - * @since 3.1.0 - */ -public interface OTFAggregator extends Serializable -{ - public static final long serialVersionUID = 201505251039L; - - /** - * This method returns all the incremental aggregators on which this aggregator depends on - * to compute its result. In the case of {@link AggregatorAverage} it's child aggregators are - * {@link AggregatorCount} and {@link AggregatorSum}. - * @return All the incremental aggregators on which this aggregator depends on to compute its - * result. - */ - - public List<Class<? extends IncrementalAggregator>> getChildAggregators(); - /** - * This method performs an on the fly aggregation from the given aggregates. The aggregates - * provided to this aggregator are each the result of one of this aggregators child aggregators. - * The order in which the aggregates are passed to this method is the same as the order in - * which the child aggregators are listed in the result of the {@link #getChildAggregators} method. - * Also note that this aggregator does not aggregate one field at a time. This aggregator recieves - * a batch of fields from each child aggregator, and the result of the method is also a batch of fields. - * @param aggregates These are the results of all the child aggregators. The results are in the same - * order as the child aggregators specified in the result of the {@link #getChildAggregators} method. - * @return The result of the on the fly aggregation. - */ - - public GPOMutable aggregate(GPOMutable... aggregates); - /** - * Returns the output type of the {@link OTFAggregator}. <b>Note<b> that any combination of input types - * will produce the same output type for {@link OTFAggregator}s. - * @return The output type of the {@link OTFAggregator}. - */ - - public Type getOutputType(); -} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java b/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java deleted file mode 100644 index 4988df7..0000000 --- a/library/src/main/java/com/datatorrent/lib/dimensions/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ [email protected] -package com.datatorrent.lib.dimensions; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java new file mode 100644 index 0000000..216e577 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/CustomTimeBucketRegistry.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.base.Preconditions; + +import com.datatorrent.lib.appdata.schemas.CustomTimeBucket; + +import it.unimi.dsi.fastutil.ints.Int2ObjectMap; +import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; + +/** + * @since 3.3.0 + */ +public class CustomTimeBucketRegistry implements Serializable +{ + private static final long serialVersionUID = 201509221536L; + + private int currentId; + + private Int2ObjectMap<CustomTimeBucket> idToTimeBucket = new Int2ObjectOpenHashMap<>(); + private Object2IntMap<CustomTimeBucket> timeBucketToId = new Object2IntOpenHashMap<>(); + private Map<String, CustomTimeBucket> textToTimeBucket = new HashMap<>(); + + public CustomTimeBucketRegistry() + { + } + + public CustomTimeBucketRegistry(int startingId) + { + this.currentId = startingId; + } + + public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket) + { + initialize(idToTimeBucket); + } + + public CustomTimeBucketRegistry(Int2ObjectMap<CustomTimeBucket> idToTimeBucket, int startingId) + { + int tempId = initialize(idToTimeBucket); + + Preconditions.checkArgument(tempId < startingId, "The statingId " + startingId + + " must be larger than the largest ID " + tempId + " in the given idToTimeBucket mapping"); + + this.idToTimeBucket = Preconditions.checkNotNull(idToTimeBucket); + this.currentId = startingId; + } + + private int initialize(Int2ObjectMap<CustomTimeBucket> idToTimeBucket) + { + Preconditions.checkNotNull(idToTimeBucket); + + int tempId = Integer.MIN_VALUE; + + for (int timeBucketId : idToTimeBucket.keySet()) { + tempId = Math.max(tempId, timeBucketId); + CustomTimeBucket customTimeBucket = idToTimeBucket.get(timeBucketId); + textToTimeBucket.put(customTimeBucket.getText(), customTimeBucket); + Preconditions.checkNotNull(customTimeBucket); + timeBucketToId.put(customTimeBucket, timeBucketId); + } + + return tempId; + } + + public CustomTimeBucket getTimeBucket(int timeBucketId) + { + return idToTimeBucket.get(timeBucketId); + } + + public Integer getTimeBucketId(CustomTimeBucket timeBucket) + { + if (!timeBucketToId.containsKey(timeBucket)) { + return null; + } + + return timeBucketToId.get(timeBucket); + } + + public CustomTimeBucket getTimeBucket(String text) + { + return textToTimeBucket.get(text); + } + + public void register(CustomTimeBucket timeBucket) + { + register(timeBucket, currentId); + } + + public void register(CustomTimeBucket timeBucket, int timeBucketId) + { + if (timeBucketToId.containsKey(timeBucket)) { + throw new IllegalArgumentException("The timeBucket " + timeBucket + " is already registered."); + } + + if (timeBucketToId.containsValue(timeBucketId)) { + throw new IllegalArgumentException("The timeBucketId " + timeBucketId + " is already registered."); + } + + idToTimeBucket.put(timeBucketId, timeBucket); + timeBucketToId.put(timeBucket, timeBucketId); + + if (timeBucketId >= currentId) { + currentId = timeBucketId + 1; + } + + textToTimeBucket.put(timeBucket.getText(), timeBucket); + } + + @Override + public String toString() + { + return "CustomTimeBucketRegistry{" + "idToTimeBucket=" + idToTimeBucket + '}'; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/c5cab8bd/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java new file mode 100644 index 0000000..90d4f7b --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/dimensions/DimensionsConversionContext.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.dimensions; + +import java.io.Serializable; + +import com.datatorrent.lib.appdata.gpo.GPOUtils.IndexSubset; +import com.datatorrent.lib.appdata.schemas.FieldsDescriptor; + +/** + * This is a context object used to convert {@link InputEvent}s into aggregates + * in {@link IncrementalAggregator}s. + * + * @since 3.3.0 + */ +public class DimensionsConversionContext implements Serializable +{ + private static final long serialVersionUID = 201506151157L; + + public CustomTimeBucketRegistry customTimeBucketRegistry; + /** + * The schema ID for {@link Aggregate}s emitted by the + * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} + * s holding this context. + */ + public int schemaID; + /** + * The dimensionsDescriptor ID for {@link Aggregate}s emitted by the + * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} + * s holding this context. + */ + public int dimensionsDescriptorID; + /** + * The aggregator ID for {@link Aggregate}s emitted by the + * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} + * s holding this context. + */ + public int aggregatorID; + /** + * The {@link DimensionsDescriptor} corresponding to the given dimension + * descriptor id. + */ + public DimensionsDescriptor dd; + /** + * The {@link FieldsDescriptor} for the aggregate of the {@link Aggregate}s + * emitted by the + * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} + * s holding this context object. + */ + public FieldsDescriptor aggregateDescriptor; + /** + * The {@link FieldsDescriptor} for the key of the {@link Aggregate}s emitted + * by the + * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} + * s holding this context object. + */ + public FieldsDescriptor keyDescriptor; + /** + * The index of the timestamp field within the key of {@link InputEvent}s + * received by the + * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} + * s holding this context object. This is -1 if the {@link InputEvent} key has + * no timestamp. + */ + public int inputTimestampIndex; + /** + * The index of the timestamp field within the key of {@link Aggregate}s + * emitted by the + * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} + * s holding this context object. This is -1 if the {@link Aggregate}'s key + * has no timestamp. + */ + public int outputTimestampIndex; + /** + * The index of the time bucket field within the key of {@link Aggregate}s + * emitted by the + * {@link org.apache.apex.malhar.lib.dimensions.aggregator.IncrementalAggregator} + * s holding this context object. This is -1 if the {@link Aggregate}'s key + * has no timebucket. + */ + public int outputTimebucketIndex; + /** + * The {@link IndexSubset} object that is used to extract key values from + * {@link InputEvent}s received by this aggregator. + */ + public IndexSubset indexSubsetKeys; + /** + * The {@link IndexSubset} object that is used to extract aggregate values + * from {@link InputEvent}s received by this aggregator. + */ + public IndexSubset indexSubsetAggregates; + + /** + * Constructor for creating conversion context. + */ + public DimensionsConversionContext() + { + //Do nothing. + } +}
