http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java index e12fea3..15e89ad 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/FlatAggregator.java @@ -26,36 +26,36 @@ import org.apache.eagle.query.aggregate.AggregateFunctionType; /** * Not thread safe */ -public class FlatAggregator extends AbstractAggregator{ - protected GroupbyBucket bucket; +public class FlatAggregator extends AbstractAggregator { + protected GroupbyBucket bucket; /** * @param groupbyFields * @param aggregateFuntionTypes * @param aggregatedFields */ - public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){ - super(groupbyFields, aggregateFuntionTypes, aggregatedFields); - bucket = new GroupbyBucket(this.aggregateFunctionTypes); - } - - public void accumulate(TaggedLogAPIEntity entity) throws Exception{ - List<String> groupbyFieldValues = createGroup(entity); - List<Double> preAggregatedValues = createPreAggregatedValues(entity); - bucket.addDatapoint(groupbyFieldValues, preAggregatedValues); - } - - public Map<List<String>, List<Double>> result(){ - return bucket.result(); - } - - protected List<String> createGroup(TaggedLogAPIEntity entity){ - List<String> groupbyFieldValues = new ArrayList<String>(); - int i = 0; - for(String groupbyField : groupbyFields){ - String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++); - groupbyFieldValues.add(groupbyFieldValue); - } - return groupbyFieldValues; - } + public FlatAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) { + super(groupbyFields, aggregateFuntionTypes, aggregatedFields); + bucket = new GroupbyBucket(this.aggregateFunctionTypes); + } + + public void accumulate(TaggedLogAPIEntity entity) throws Exception { + List<String> groupbyFieldValues = createGroup(entity); + List<Double> preAggregatedValues = createPreAggregatedValues(entity); + bucket.addDatapoint(groupbyFieldValues, preAggregatedValues); + } + + public Map<List<String>, List<Double>> result() { + return bucket.result(); + } + + protected List<String> createGroup(TaggedLogAPIEntity entity) { + List<String> groupbyFieldValues = new ArrayList<String>(); + int i = 0; + for (String groupbyField : groupbyFields) { + String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i++); + groupbyFieldValues.add(groupbyFieldValue); + } + return groupbyFieldValues; + } }
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java index ea57edb..93e65a9 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyBucket.java @@ -31,211 +31,230 @@ import java.util.List; import java.util.Map; public class GroupbyBucket { - private final static Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class); - - public static Map<String, FunctionFactory> _functionFactories = - new HashMap<>(); - - // TODO put this logic to AggregatorFunctionType - static{ - _functionFactories.put(AggregateFunctionType.count.name(), new CountFactory()); - _functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory()); - _functionFactories.put(AggregateFunctionType.min.name(), new MinFactory()); - _functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory()); - _functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory()); - } - - private List<AggregateFunctionType> types; -// private SortedMap<List<String>, List<Function>> group2FunctionMap = -// new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator()); - - private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator()); - - public GroupbyBucket(List<AggregateFunctionType> types){ - this.types = types; - } - - public void addDatapoint(List<String> groupbyFieldValues, List<Double> values){ - // LOG.info("DEBUG: addDatapoint: groupby=["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]"); - - // locate groupby bucket - List<Function> functions = group2FunctionMap.get(groupbyFieldValues); - if(functions == null){ - functions = new ArrayList<Function>(); - for(AggregateFunctionType type : types){ - functions.add(_functionFactories.get(type.name()).createFunction()); - } - group2FunctionMap.put(groupbyFieldValues, functions); - } - int functionIndex = 0; - for(Double v : values){ - functions.get(functionIndex).run(v); - functionIndex++; - } - } - - public Map<List<String>, List<Double>> result(){ - Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>(); - for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){ - List<Double> values = new ArrayList<Double>(); - for(Function f : entry.getValue()){ - values.add(f.result()); - } - result.put(entry.getKey(), values); - } - return result; - } - - public List<GroupbyKeyValue> getGroupbyKeyValue(){ - List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>(); - - for(Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()){ - GroupbyKey key = new GroupbyKey(); - for(String keyStr:entry.getKey()){ - try { - key.addValue(keyStr.getBytes(QueryConstants.CHARSET)); - } catch (UnsupportedEncodingException e) { - LOG.error(e.getMessage(),e); - } - } - GroupbyValue value = new GroupbyValue(); - for(Function f : entry.getValue()){ - value.add(f.result()); - value.addMeta(f.count()); - } - results.add(new GroupbyKeyValue(key,value)); - } - - return results; - } - - public static interface FunctionFactory{ - public Function createFunction(); - } - - public static abstract class Function{ - protected int count; - - public abstract void run(double v); - public abstract double result(); - public int count(){ - return count; - } - public void incrCount(){ - count ++; - } - } - - private static class CountFactory implements FunctionFactory{ - @Override - public Function createFunction(){ - return new Count(); - } - } - - - private static class Count extends Sum{ - public Count(){ - super(); - } - } - - private static class SumFactory implements FunctionFactory{ - @Override - public Function createFunction(){ - return new Sum(); - } - } - - private static class Sum extends Function{ - private double summary; - public Sum(){ - this.summary = 0.0; - } - @Override - public void run(double v){ - this.incrCount(); - this.summary += v; - } - - @Override - public double result(){ - return this.summary; - } - } - - private static class MinFactory implements FunctionFactory{ - @Override - public Function createFunction(){ - return new Min(); - } - } - public static class Min extends Function{ - private double minimum; - public Min(){ - // TODO is this a bug, or only positive numeric calculation is supported - this.minimum = Double.MAX_VALUE; - } - - @Override - public void run(double v){ - if(v < minimum){ - minimum = v; - } - this.incrCount(); - } - - @Override - public double result(){ - return minimum; - } - } - - private static class MaxFactory implements FunctionFactory{ - @Override - public Function createFunction(){ - return new Max(); - } - } - public static class Max extends Function{ - private double maximum; - public Max(){ - // TODO is this a bug, or only positive numeric calculation is supported - this.maximum = 0.0; - } - @Override - public void run(double v){ - if(v > maximum){ - maximum = v; - } - this.incrCount(); - } - - @Override - public double result(){ - return maximum; - } - } - - private static class AvgFactory implements FunctionFactory{ - @Override - public Function createFunction(){ - return new Avg(); - } - } - public static class Avg extends Function{ - private double total; - public Avg(){ - this.total = 0.0; - } - @Override - public void run(double v){ - total += v; - this.incrCount(); - } - @Override - public double result(){ - return this.total/this.count; - } - } + private static final Logger LOG = LoggerFactory.getLogger(GroupbyBucket.class); + + public static Map<String, FunctionFactory> functionFactories = new HashMap<>(); + + // TODO put this logic to AggregatorFunctionType + static { + functionFactories.put(AggregateFunctionType.count.name(), new CountFactory()); + functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory()); + functionFactories.put(AggregateFunctionType.min.name(), new MinFactory()); + functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory()); + functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory()); + } + + private List<AggregateFunctionType> types; + // private SortedMap<List<String>, List<Function>> group2FunctionMap = + // new TreeMap<List<String>, List<Function>>(new GroupbyFieldsComparator()); + + private Map<List<String>, List<Function>> group2FunctionMap = new HashMap<>(); //new GroupbyFieldsComparator()); + + public GroupbyBucket(List<AggregateFunctionType> types) { + this.types = types; + } + + public void addDatapoint(List<String> groupbyFieldValues, List<Double> values) { + // LOG.info("DEBUG: addDatapoint: groupby =["+StringUtils.join(groupbyFieldValues,",")+"], values=["+StringUtils.join(values, ",")+"]"); + + // locate groupby bucket + List<Function> functions = group2FunctionMap.get(groupbyFieldValues); + if (functions == null) { + functions = new ArrayList<Function>(); + for (AggregateFunctionType type : types) { + functions.add(functionFactories.get(type.name()).createFunction()); + } + group2FunctionMap.put(groupbyFieldValues, functions); + } + int functionIndex = 0; + for (Double v : values) { + functions.get(functionIndex).run(v); + functionIndex++; + } + } + + public Map<List<String>, List<Double>> result() { + Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>(); + for (Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()) { + List<Double> values = new ArrayList<Double>(); + for (Function f : entry.getValue()) { + values.add(f.result()); + } + result.put(entry.getKey(), values); + } + return result; + } + + public List<GroupbyKeyValue> getGroupbyKeyValue() { + List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>(); + + for (Map.Entry<List<String>, List<Function>> entry : this.group2FunctionMap.entrySet()) { + GroupbyKey key = new GroupbyKey(); + for (String keyStr:entry.getKey()) { + try { + key.addValue(keyStr.getBytes(QueryConstants.CHARSET)); + } catch (UnsupportedEncodingException e) { + LOG.error(e.getMessage(),e); + } + } + GroupbyValue value = new GroupbyValue(); + for (Function f : entry.getValue()) { + value.add(f.result()); + value.addMeta(f.count()); + } + results.add(new GroupbyKeyValue(key,value)); + } + + return results; + } + + public static interface FunctionFactory { + public Function createFunction(); + } + + public abstract static class Function { + protected int count; + + public abstract void run(double v); + + public abstract double result(); + + public int count() { + return count; + } + + public void incrCount() { + count ++; + } + } + + private static class CountFactory implements FunctionFactory { + + @Override + public Function createFunction() { + return new Count(); + } + } + + + private static class Count extends Sum { + + public Count() { + super(); + } + } + + private static class SumFactory implements FunctionFactory { + + @Override + public Function createFunction() { + return new Sum(); + } + } + + private static class Sum extends Function { + private double summary; + + public Sum() { + this.summary = 0.0; + } + + @Override + public void run(double v) { + this.incrCount(); + this.summary += v; + } + + @Override + public double result() { + return this.summary; + } + } + + private static class MinFactory implements FunctionFactory { + + @Override + public Function createFunction() { + return new Min(); + } + } + + public static class Min extends Function { + private double minimum; + + public Min() { + // TODO is this a bug, or only positive numeric calculation is supported + this.minimum = Double.MAX_VALUE; + } + + @Override + public void run(double v) { + if (v < minimum) { + minimum = v; + } + this.incrCount(); + } + + @Override + public double result() { + return minimum; + } + } + + private static class MaxFactory implements FunctionFactory { + + @Override + public Function createFunction() { + return new Max(); + } + } + + public static class Max extends Function { + private double maximum; + + public Max() { + // TODO is this a bug, or only positive numeric calculation is supported + this.maximum = 0.0; + } + + @Override + public void run(double v) { + if (v > maximum) { + maximum = v; + } + this.incrCount(); + } + + @Override + public double result() { + return maximum; + } + } + + private static class AvgFactory implements FunctionFactory { + + @Override + public Function createFunction() { + return new Avg(); + } + } + + public static class Avg extends Function { + private double total; + + public Avg() { + this.total = 0.0; + } + + @Override + public void run(double v) { + total += v; + this.incrCount(); + } + + @Override + public double result() { + return this.total / this.count; + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java index 6635483..b612ccf 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/GroupbyFieldsComparator.java @@ -22,18 +22,21 @@ import java.util.List; /** * this is default comparator for aggregation. The behavior is to sort by groupby fields ascendantly */ -public class GroupbyFieldsComparator implements Comparator<List<String>>{ - @Override - public int compare(List<String> list1, List<String> list2){ - if(list1 == null || list2 == null || list1.size() != list2.size()) - throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size"); - int r = 0; - int index = 0; - for(String s1 : list1){ - r = s1.compareTo(list2.get(index++)); - if(r != 0) - return r; - } - return r; - } +public class GroupbyFieldsComparator implements Comparator<List<String>> { + + @Override + public int compare(List<String> list1, List<String> list2) { + if (list1 == null || list2 == null || list1.size() != list2.size()) { + throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size"); + } + int r = 0; + int index = 0; + for (String s1 : list1) { + r = s1.compareTo(list2.get(index++)); + if (r != 0) { + return r; + } + } + return r; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java index 341fa00..559061b 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregateEntity.java @@ -25,43 +25,52 @@ import java.util.TreeMap; import com.fasterxml.jackson.databind.annotation.JsonSerialize; -@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) public class HierarchicalAggregateEntity { - private String key; - private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>(); - private List<Double> values = new ArrayList<Double>(); - private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>(); - private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null; + private String key; + private List<GroupbyBucket.Function> tmpValues = new ArrayList<GroupbyBucket.Function>(); + private List<Double> values = new ArrayList<Double>(); + private SortedMap<String, HierarchicalAggregateEntity> children = new TreeMap<String, HierarchicalAggregateEntity>(); + private SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList = null; - public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() { - return sortedList; - } - public void setSortedList( - SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) { - this.sortedList = sortedList; - } - public List<GroupbyBucket.Function> getTmpValues() { - return tmpValues; - } - public void setTmpValues(List<GroupbyBucket.Function> tmpValues) { - this.tmpValues = tmpValues; - } - public String getKey() { - return key; - } - public void setKey(String key) { - this.key = key; - } - public List<Double> getValues() { - return values; - } - public void setValues(List<Double> values) { - this.values = values; - } - public SortedMap<String, HierarchicalAggregateEntity> getChildren() { - return children; - } - public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) { - this.children = children; - } + public SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> getSortedList() { + return sortedList; + } + + public void setSortedList( + SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedList) { + this.sortedList = sortedList; + } + + public List<GroupbyBucket.Function> getTmpValues() { + return tmpValues; + } + + public void setTmpValues(List<GroupbyBucket.Function> tmpValues) { + this.tmpValues = tmpValues; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public List<Double> getValues() { + return values; + } + + public void setValues(List<Double> values) { + this.values = values; + } + + public SortedMap<String, HierarchicalAggregateEntity> getChildren() { + return children; + } + + public void setChildren(SortedMap<String, HierarchicalAggregateEntity> children) { + this.children = children; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java index ecb80ac..8751a74 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/HierarchicalAggregator.java @@ -22,61 +22,61 @@ import java.util.SortedMap; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.query.aggregate.AggregateFunctionType; -public class HierarchicalAggregator extends AbstractAggregator{ - private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity(); +public class HierarchicalAggregator extends AbstractAggregator { + private HierarchicalAggregateEntity root = new HierarchicalAggregateEntity(); - public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){ - super(groupbyFields, aggregateFuntionTypes, aggregatedFields); - } + public HierarchicalAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) { + super(groupbyFields, aggregateFuntionTypes, aggregatedFields); + } - public void accumulate(TaggedLogAPIEntity entity) throws Exception{ - List<Double> preAggregatedValues = createPreAggregatedValues(entity); - // aggregate to root first - addDatapoint(root, preAggregatedValues); - // go through hierarchical tree - HierarchicalAggregateEntity current = root; - int i = 0; - for(String groupbyField : groupbyFields){ - // determine groupbyFieldValue from tag or fields - String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i); - SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren(); - if(children.get(groupbyFieldValue) == null){ - HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity(); - children.put(groupbyFieldValue, tmp); - } - children.get(groupbyFieldValue).setKey(groupbyFieldValue); - addDatapoint(children.get(groupbyFieldValue), preAggregatedValues); - current = children.get(groupbyFieldValue); - } - } + public void accumulate(TaggedLogAPIEntity entity) throws Exception { + List<Double> preAggregatedValues = createPreAggregatedValues(entity); + // aggregate to root first + addDatapoint(root, preAggregatedValues); + // go through hierarchical tree + HierarchicalAggregateEntity current = root; + int i = 0; + for (String groupbyField : groupbyFields) { + // determine groupbyFieldValue from tag or fields + String groupbyFieldValue = determineGroupbyFieldValue(entity, groupbyField, i); + SortedMap<String, HierarchicalAggregateEntity> children = current.getChildren(); + if (children.get(groupbyFieldValue) == null) { + HierarchicalAggregateEntity tmp = new HierarchicalAggregateEntity(); + children.put(groupbyFieldValue, tmp); + } + children.get(groupbyFieldValue).setKey(groupbyFieldValue); + addDatapoint(children.get(groupbyFieldValue), preAggregatedValues); + current = children.get(groupbyFieldValue); + } + } - private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values){ - List<GroupbyBucket.Function> functions = entity.getTmpValues(); - // initialize list of function - if(functions.isEmpty()){ - for(AggregateFunctionType type : aggregateFunctionTypes){ - functions.add(GroupbyBucket._functionFactories.get(type.name()).createFunction()); - } - } - int functionIndex = 0; - for(Double v : values){ - functions.get(functionIndex).run(v); - functionIndex++; - } - } + private void addDatapoint(HierarchicalAggregateEntity entity, List<Double> values) { + List<GroupbyBucket.Function> functions = entity.getTmpValues(); + // initialize list of function + if (functions.isEmpty()) { + for (AggregateFunctionType type : aggregateFunctionTypes) { + functions.add(GroupbyBucket.functionFactories.get(type.name()).createFunction()); + } + } + int functionIndex = 0; + for (Double v : values) { + functions.get(functionIndex).run(v); + functionIndex++; + } + } - private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity){ - for(GroupbyBucket.Function f : entity.getTmpValues()){ - entity.getValues().add(f.result()); - } - for(HierarchicalAggregateEntity child : entity.getChildren().values()){ - finalizeHierarchicalAggregateEntity(child); - } - entity.setTmpValues(null); - } + private void finalizeHierarchicalAggregateEntity(HierarchicalAggregateEntity entity) { + for (GroupbyBucket.Function f : entity.getTmpValues()) { + entity.getValues().add(f.result()); + } + for (HierarchicalAggregateEntity child : entity.getChildren().values()) { + finalizeHierarchicalAggregateEntity(child); + } + entity.setTmpValues(null); + } - public HierarchicalAggregateEntity result(){ - finalizeHierarchicalAggregateEntity(root); - return this.root; - } + public HierarchicalAggregateEntity result() { + finalizeHierarchicalAggregateEntity(root); + return this.root; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java index f62d2c2..8ca24c6 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostFlatAggregateSort.java @@ -24,70 +24,74 @@ import java.util.SortedSet; import java.util.TreeSet; public class PostFlatAggregateSort { - private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) { - SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions)); - sortedEntries.addAll(map.entrySet()); - return sortedEntries; - } + private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue(Map<List<String>, List<Double>> map, List<SortOption> sortOptions) { + SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>(new MapEntryComparator(sortOptions)); + sortedEntries.addAll(map.entrySet()); + return sortedEntries; + } - /** - * sort aggregated results with sort options - * @param aggregatedResult aggregated result set, but it is not sorted - * @sortOptions sorting options - * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned - */ - public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN){ - SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions); - List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>(); - for (Map.Entry<List<String>, List<Double>> entry : allList) { - result.add(entry); - if (topN > 0 && result.size() >= topN) { - break; - } - } - return result; - } + /** + * sort aggregated results with sort options + * @param aggregatedResult aggregated result set, but it is not sorted + * @sortOptions sorting options + * @topN top N results will be returned if topN is specified. If it's not specified (as default value 0), all results will be returned + */ + public static List<Map.Entry<List<String>, List<Double>>> sort(Map<List<String>, List<Double>> aggregatedResult, List<SortOption> sortOptions, int topN) { + SortedSet<Map.Entry<List<String>, List<Double>>> allList = sortByValue(aggregatedResult, sortOptions); + List<Map.Entry<List<String>, List<Double>>> result = new ArrayList<Map.Entry<List<String>, List<Double>>>(); + for (Map.Entry<List<String>, List<Double>> entry : allList) { + result.add(entry); + if (topN > 0 && result.size() >= topN) { + break; + } + } + return result; + } + + private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>> { + private List<SortOption> sortOptions; + + public MapEntryComparator(List<SortOption> sortOptions) { + this.sortOptions = sortOptions; + } - private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>>{ - private List<SortOption> sortOptions; - public MapEntryComparator(List<SortOption> sortOptions){ - this.sortOptions = sortOptions; - } - /** - * default to sort by all groupby fields - */ - @Override - public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2){ - int r = 0; - List<String> keyList1 = e1.getKey(); - List<Double> valueList1 = e1.getValue(); - List<String> keyList2 = e2.getKey(); - List<Double> valueList2 = e2.getValue(); - for(SortOption so : sortOptions){ - int index = so.getIndex(); - if (index == -1) { - continue; - } - if(!so.isInGroupby()){ // sort fields come from functions - Double value1 = valueList1.get(index); - Double value2 = valueList2.get(index); - r = value1.compareTo(value2); - }else{ // sort fields come from groupby fields - String key1 = keyList1.get(index); - String key2 = keyList2.get(index); - r = key1.compareTo(key2); - } - if(r == 0) continue; - if(!so.isAscendant()){ - r = -r; - } - return r; - } - // default to sort by groupby fields ascendently - if(r ==0){ // TODO is this check necessary - return new GroupbyFieldsComparator().compare(keyList1, keyList2); - } - return r; + /** + * default to sort by all groupby fields + */ + @Override + public int compare(Map.Entry<List<String>, List<Double>> e1, Map.Entry<List<String>, List<Double>> e2) { + int r = 0; + List<String> keyList1 = e1.getKey(); + List<Double> valueList1 = e1.getValue(); + List<String> keyList2 = e2.getKey(); + List<Double> valueList2 = e2.getValue(); + for (SortOption so : sortOptions) { + int index = so.getIndex(); + if (index == -1) { + continue; + } + if (!so.isInGroupby()) { // sort fields come from functions + Double value1 = valueList1.get(index); + Double value2 = valueList2.get(index); + r = value1.compareTo(value2); + } else { // sort fields come from groupby fields + String key1 = keyList1.get(index); + String key2 = keyList2.get(index); + r = key1.compareTo(key2); + } + if (r == 0) { + continue; + } + if (!so.isAscendant()) { + r = -r; + } + return r; + } + // default to sort by groupby fields ascendently + if (r == 0) { // TODO is this check necessary + return new GroupbyFieldsComparator().compare(keyList1, keyList2); + } + return r; } - } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java index 7b0997b..bd475f9 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/PostHierarchicalAggregateSort.java @@ -24,69 +24,71 @@ import java.util.TreeSet; public class PostHierarchicalAggregateSort { - private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) { - SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions)); - sortedEntries.addAll(entity.getChildren().entrySet()); - return sortedEntries; - } + private static SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortByValue(HierarchicalAggregateEntity entity, List<SortOption> sortOptions) { + SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> sortedEntries = new TreeSet<Map.Entry<String, HierarchicalAggregateEntity>>(new MapEntryComparator(sortOptions)); + sortedEntries.addAll(entity.getChildren().entrySet()); + return sortedEntries; + } - /** - * sort aggregated results with sort options + /** + * sort aggregated results with sort options * * @param result * @param sortOptions * @return */ - public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions){ - SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions); - result.setSortedList(tmp); - result.setChildren(null); - for(Map.Entry<String, HierarchicalAggregateEntity> entry : tmp){ - sort(entry.getValue(), sortOptions); - } - return result; - } + public static HierarchicalAggregateEntity sort(HierarchicalAggregateEntity result, List<SortOption> sortOptions) { + SortedSet<Map.Entry<String, HierarchicalAggregateEntity>> tmp = sortByValue(result, sortOptions); + result.setSortedList(tmp); + result.setChildren(null); + for (Map.Entry<String, HierarchicalAggregateEntity> entry : tmp) { + sort(entry.getValue(), sortOptions); + } + return result; + } + + private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>> { + private List<SortOption> sortOptions; - private static class MapEntryComparator implements Comparator<Map.Entry<String, HierarchicalAggregateEntity>>{ - private List<SortOption> sortOptions; + public MapEntryComparator(List<SortOption> sortOptions) { + this.sortOptions = sortOptions; + } - public MapEntryComparator(List<SortOption> sortOptions){ - this.sortOptions = sortOptions; - } + /** + * default to sort by all groupby fields + */ + @Override + public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2) { + int r = 0; + String key1 = e1.getKey(); + List<Double> valueList1 = e1.getValue().getValues(); + String key2 = e2.getKey(); + List<Double> valueList2 = e2.getValue().getValues(); + for (SortOption so : sortOptions) { + int index = so.getIndex(); + if (index == -1) { + continue; + } + if (!so.isInGroupby()) { // sort fields come from functions + Double value1 = valueList1.get(index); + Double value2 = valueList2.get(index); + r = value1.compareTo(value2); + } + // sort fields come from groupby fields, then silently ignored - /** - * default to sort by all groupby fields - */ - @Override - public int compare(Map.Entry<String, HierarchicalAggregateEntity> e1, Map.Entry<String, HierarchicalAggregateEntity> e2){ - int r = 0; - String key1 = e1.getKey(); - List<Double> valueList1 = e1.getValue().getValues(); - String key2 = e2.getKey(); - List<Double> valueList2 = e2.getValue().getValues(); - for(SortOption so : sortOptions){ - int index = so.getIndex(); - if (index == -1) { - continue; - } - if(!so.isInGroupby()){ // sort fields come from functions - Double value1 = valueList1.get(index); - Double value2 = valueList2.get(index); - r = value1.compareTo(value2); - } - // sort fields come from groupby fields, then silently ignored - - if(r == 0) continue; - if(!so.isAscendant()){ - r = -r; - } - return r; - } - // default to sort by groupby fields ascendently - if(r ==0){ - return key1.compareTo(key2); - } - return r; + if (r == 0) { + continue; + } + if (!so.isAscendant()) { + r = -r; + } + return r; + } + // default to sort by groupby fields ascendently + if (r == 0) { + return key1.compareTo(key2); + } + return r; } - } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java index d1578ac..c848122 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOption.java @@ -19,31 +19,36 @@ package org.apache.eagle.query.aggregate.timeseries; /** * sum(field1), max(field2) groupby(field3, field4) sort by field1 asc, field3 desc * There are 2 SortOption object, then - * the 1st one is inGroupby=false, index=0, ascendent=true - * the 2nd one is inGroupby=true, index=1, ascendent=false + * the 1st one is inGroupby = false, index=0, ascendent=true + * the 2nd one is inGroupby = true, index=1, ascendent=false * */ public class SortOption { - private boolean inGroupby; // sort field defaultly is not from groupby fields - private int index; // index relative to list of groupby fields or list of functions - private boolean ascendant; //asc or desc + private boolean inGroupby; // sort field defaultly is not from groupby fields + private int index; // index relative to list of groupby fields or list of functions + private boolean ascendant; //asc or desc - public boolean isInGroupby() { - return inGroupby; - } - public void setInGroupby(boolean inGroupby) { - this.inGroupby = inGroupby; - } - public int getIndex() { - return index; - } - public void setIndex(int index) { - this.index = index; - } - public boolean isAscendant() { - return ascendant; - } - public void setAscendant(boolean ascendant) { - this.ascendant = ascendant; - } + public boolean isInGroupby() { + return inGroupby; + } + + public void setInGroupby(boolean inGroupby) { + this.inGroupby = inGroupby; + } + + public int getIndex() { + return index; + } + + public void setIndex(int index) { + this.index = index; + } + + public boolean isAscendant() { + return ascendant; + } + + public void setAscendant(boolean ascendant) { + this.ascendant = ascendant; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java index 1360e0c..2457b4e 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SortOptionsParser.java @@ -25,45 +25,45 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class SortOptionsParser { - private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class); - private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$"); - - public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields){ - List<SortOption> list = new ArrayList<SortOption>(); - for(String sortOption : sortOptions){ - Matcher m = pattern.matcher(sortOption); - if(!m.find()){ - throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc"); - } - String field = m.group(1); - if (sortFields != null) { - sortFields.add(field); - } - SortOption so = new SortOption(); - list.add(so); - so.setAscendant(m.group(2).equals("asc") ? true : false); - int index = aggregatedFields.indexOf(field); - if(index > -1){ - so.setInGroupby(false); - so.setIndex(index); - continue; - } - if(groupbyFields != null){ // if groupbyFields is not provided, ignore this sort field - index = groupbyFields.indexOf(field); - if(index > -1){ - so.setInGroupby(true); - so.setIndex(index); - continue; - } - } - logNonExistingSortByField(field); - so.setInGroupby(false); - so.setIndex(-1); - } - return list; - } - - private static void logNonExistingSortByField(String sortByField){ - LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField); - } + private static final Logger LOG = LoggerFactory.getLogger(SortOptionsParser.class); + private static Pattern pattern = Pattern.compile("^(.+)\\s+(asc|desc)$"); + + public static List<SortOption> parse(List<String> groupbyFields, List<String> aggregatedFields, List<String> sortOptions, List<String> sortFields) { + List<SortOption> list = new ArrayList<SortOption>(); + for (String sortOption : sortOptions) { + Matcher m = pattern.matcher(sortOption); + if (!m.find()) { + throw new IllegalArgumentException("sort option must have the format of <groupbyfield|function> asc|desc"); + } + String field = m.group(1); + if (sortFields != null) { + sortFields.add(field); + } + SortOption so = new SortOption(); + list.add(so); + so.setAscendant(m.group(2).equals("asc") ? true : false); + int index = aggregatedFields.indexOf(field); + if (index > -1) { + so.setInGroupby(false); + so.setIndex(index); + continue; + } + if (groupbyFields != null) { // if groupbyFields is not provided, ignore this sort field + index = groupbyFields.indexOf(field); + if (index > -1) { + so.setInGroupby(true); + so.setIndex(index); + continue; + } + } + logNonExistingSortByField(field); + so.setInGroupby(false); + so.setIndex(-1); + } + return list; + } + + private static void logNonExistingSortByField(String sortByField) { + LOG.warn("Sortby field is neither in aggregated fields or groupby fields, ignore " + sortByField); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java index d8b781e..f4eabcd 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedAggregator.java @@ -18,18 +18,18 @@ package org.apache.eagle.query.aggregate.timeseries; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; -public class SynchronizedAggregator implements Aggregator{ - private Object mutex = new Object(); - private Aggregator agg; - - public SynchronizedAggregator(Aggregator agg){ - this.agg = agg; - } - - @Override - public void accumulate(TaggedLogAPIEntity entity) throws Exception{ - synchronized(mutex){ - agg.accumulate(entity); - } - } -} +public class SynchronizedAggregator implements Aggregator { + private Object mutex = new Object(); + private Aggregator agg; + + public SynchronizedAggregator(Aggregator agg) { + this.agg = agg; + } + + @Override + public void accumulate(TaggedLogAPIEntity entity) throws Exception { + synchronized (mutex) { + agg.accumulate(entity); + } + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java index 7c1412e..baa89be 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/SynchronizedEntityCreationListener.java @@ -19,18 +19,18 @@ package org.apache.eagle.query.aggregate.timeseries; import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity; import org.apache.eagle.log.entity.EntityCreationListener; -public class SynchronizedEntityCreationListener implements EntityCreationListener{ - private Object mutex = new Object(); - private EntityCreationListener listener; - - public SynchronizedEntityCreationListener(EntityCreationListener listener){ - this.listener = listener; - } - - @Override - public void entityCreated(TaggedLogAPIEntity entity) throws Exception{ - synchronized(mutex){ - listener.entityCreated(entity); - } - } +public class SynchronizedEntityCreationListener implements EntityCreationListener { + private Object mutex = new Object(); + private EntityCreationListener listener; + + public SynchronizedEntityCreationListener(EntityCreationListener listener) { + this.listener = listener; + } + + @Override + public void entityCreated(TaggedLogAPIEntity entity) throws Exception { + synchronized (mutex) { + listener.entityCreated(entity); + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java index 5bebe13..e142657 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesAggregator.java @@ -29,7 +29,7 @@ import java.util.List; import java.util.Map; /** - * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would + * TODO Assuming that data point comes in the sequence of occurrence time desc or asc would * save memory for holding all the data in the memory * * <h3>Aggregate Bucket Structure</h3> @@ -41,129 +41,135 @@ import java.util.Map; * */ public class TimeSeriesAggregator extends FlatAggregator implements GroupbyKeyAggregatable { - private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class); - private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000; - private long startTime; - private long endTime; - private long intervalms; - private int numFunctions; - private int ignoredEntityCounter = 0; - - public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields, - long startTime, long endTime, long intervalms){ - super(groupbyFields, aggregateFuntionTypes, aggregatedFields); - // guard to avoid too many data points returned -// validateTimeRange(startTime, endTime, intervalms); - this.startTime = startTime; - this.endTime = endTime; - this.intervalms = intervalms; - this.numFunctions = aggregateFuntionTypes.size(); - } + private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesAggregator.class); + private static final int DEFAULT_DATAPOINT_MAX_COUNT = 1000; + private long startTime; + private long endTime; + private long intervalms; + private int numFunctions; + private int ignoredEntityCounter = 0; -// @Deprecated -// public static void validateTimeRange(long startTime, long endTime, long intervalms){ -// if(startTime >= endTime || intervalms <= 0){ -// throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms); -// } -// if((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT){ -// throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT + ", current # of datapoints is " + (endTime-startTime)/intervalms); -// } -// } - - public void accumulate(TaggedLogAPIEntity entity) throws Exception{ - List<String> groupbyFieldValues = createGroup(entity); - // TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side - // guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime - if(entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime){ - if(LOG.isDebugEnabled()) LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime); - this.ignoredEntityCounter ++; - return; - } - // time series bucket index - long located =(entity.getTimestamp() - startTime)/intervalms; - groupbyFieldValues.add(String.valueOf(located)); - List<Double> preAggregatedValues = createPreAggregatedValues(entity); - bucket.addDatapoint(groupbyFieldValues, preAggregatedValues); - } - - public Map<List<String>, List<Double>> result(){ - if(this.ignoredEntityCounter > 0) - LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime); - return bucket.result(); - } + public TimeSeriesAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields, + long startTime, long endTime, long intervalms) { + super(groupbyFields, aggregateFuntionTypes, aggregatedFields); + // guard to avoid too many data points returned + // validateTimeRange(startTime, endTime, intervalms); + this.startTime = startTime; + this.endTime = endTime; + this.intervalms = intervalms; + this.numFunctions = aggregateFuntionTypes.size(); + } - /** - * Support new aggregate result - * - * @return - */ - @Override - public List<GroupbyKeyValue> getGroupbyKeyValues(){ - if(this.ignoredEntityCounter > 0) - LOG.warn("Ignored "+this.ignoredEntityCounter+" entities for reason: timestamp > "+this.endTime+" or < "+this.startTime); - return bucket.getGroupbyKeyValue(); - } - - public Map<List<String>, List<double[]>> getMetric(){ - // groupbyfields+timeseriesbucket --> aggregatedvalues for different function - Map<List<String>, List<Double>> result = bucket.result(); -// Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>(); -// /** -// * bug fix: startTime is inclusive and endTime is exclusive -// */ -//// int numDatapoints =(int)((endTime-startTime)/intervalms + 1); -// int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1); -// for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){ -// // get groups -// List<String> groupbyFields = entry.getKey(); -// List<String> copy = new ArrayList<String>(groupbyFields); -// String strTimeseriesIndex = copy.remove(copy.size()-1); -// List<double[]> functionValues = timeseriesDatapoints.get(copy); -// if(functionValues == null){ -// functionValues = new ArrayList<double[]>(); -// timeseriesDatapoints.put(copy, functionValues); -// for(int i=0; i<numFunctions; i++){ -// functionValues.add(new double[numDatapoints]); -// } -// } -// int timeseriesIndex = Integer.valueOf(strTimeseriesIndex); -// int functionIndex = 0; -// for(double[] values : functionValues){ -// values[timeseriesIndex] = entry.getValue().get(functionIndex); -// functionIndex++; -// } -// } -// return timeseriesDatapoints; - return toMetric(result,(int)((endTime-1-startTime)/intervalms + 1),this.numFunctions); - } + // @Deprecated + // public static void validateTimeRange(long startTime, long endTime, long intervalms) { + // if (startTime >= endTime || intervalms <= 0) { + // throw new IllegalArgumentException("invalid argument, startTime should be less than endTime and " + // + "interval must be greater than 0, starTime is " + startTime + " and endTime is " + endTime + ", interval is " + intervalms); + // } + // if ((endTime-startTime)/intervalms > DEFAULT_DATAPOINT_MAX_COUNT) { + // throw new IllegalArgumentException("invalid argument, # of datapoints should be less than " + DEFAULT_DATAPOINT_MAX_COUNT + // + ", current # of datapoints is " + (endTime-startTime)/intervalms); + // } + // } - public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions){ - Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>(); - /** - * bug fix: startTime is inclusive and endTime is exclusive - */ -// int numDatapoints =(int)((endTime-startTime)/intervalms + 1); -// int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1); - for(Map.Entry<List<String>, List<Double>> entry : result.entrySet()){ - // get groups - List<String> groupbyFields = entry.getKey(); - List<String> copy = new ArrayList<String>(groupbyFields); - String strTimeseriesIndex = copy.remove(copy.size()-1); - List<double[]> functionValues = timeseriesDatapoints.get(copy); - if(functionValues == null){ - functionValues = new ArrayList<double[]>(); - timeseriesDatapoints.put(copy, functionValues); - for(int i=0; i<numFunctions; i++){ - functionValues.add(new double[numDatapoints]); - } - } - int timeseriesIndex = Integer.valueOf(strTimeseriesIndex); - int functionIndex = 0; - for(double[] values : functionValues){ - values[timeseriesIndex] = entry.getValue().get(functionIndex); - functionIndex++; - } - } - return timeseriesDatapoints; - } + public void accumulate(TaggedLogAPIEntity entity) throws Exception { + List<String> groupbyFieldValues = createGroup(entity); + // TODO: make sure timestamp be in range of this.startTime to this.endTime in outer side + // guard the time range to avoid to accumulate entities whose timestamp is bigger than endTime + if (entity.getTimestamp() >= this.endTime || entity.getTimestamp() < this.startTime) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignore in-coming entity whose timestamp > endTime or < startTime, timestamp: " + entity.getTimestamp() + ", startTime:" + startTime + ", endTime:" + endTime); + } + this.ignoredEntityCounter ++; + return; + } + // time series bucket index + long located = (entity.getTimestamp() - startTime) / intervalms; + groupbyFieldValues.add(String.valueOf(located)); + List<Double> preAggregatedValues = createPreAggregatedValues(entity); + bucket.addDatapoint(groupbyFieldValues, preAggregatedValues); + } + + public Map<List<String>, List<Double>> result() { + if (this.ignoredEntityCounter > 0) { + LOG.warn("Ignored " + this.ignoredEntityCounter + " entities for reason: timestamp > " + this.endTime + " or < " + this.startTime); + } + return bucket.result(); + } + + /** + * Support new aggregate result + * + * @return + */ + @Override + public List<GroupbyKeyValue> getGroupbyKeyValues() { + if (this.ignoredEntityCounter > 0) { + LOG.warn("Ignored " + this.ignoredEntityCounter + " entities for reason: timestamp > " + this.endTime + " or < " + this.startTime); + } + return bucket.getGroupbyKeyValue(); + } + + public Map<List<String>, List<double[]>> getMetric() { + // groupbyfields+timeseriesbucket --> aggregatedvalues for different function + Map<List<String>, List<Double>> result = bucket.result(); + // Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>(); + // /** + // * bug fix: startTime is inclusive and endTime is exclusive + // */ + //// int numDatapoints =(int)((endTime-startTime)/intervalms + 1); + // int numDatapoints =(int)((endTime-1-startTime)/intervalms + 1); + // for (Map.Entry<List<String>, List<Double>> entry : result.entrySet()) { + // // get groups + // List<String> groupbyFields = entry.getKey(); + // List<String> copy = new ArrayList<String>(groupbyFields); + // String strTimeseriesIndex = copy.remove(copy.size()-1); + // List<double[]> functionValues = timeseriesDatapoints.get(copy); + // if (functionValues == null) { + // functionValues = new ArrayList<double[]>(); + // timeseriesDatapoints.put(copy, functionValues); + // for (int i = 0; i<numFunctions; i++) { + // functionValues.add(new double[numDatapoints]); + // } + // } + // int timeseriesIndex = Integer.valueOf(strTimeseriesIndex); + // int functionIndex = 0; + // for (double[] values : functionValues) { + // values[timeseriesIndex] = entry.getValue().get(functionIndex); + // functionIndex++; + // } + // } + // return timeseriesDatapoints; + return toMetric(result,(int)((endTime - 1 - startTime) / intervalms + 1), this.numFunctions); + } + + public static Map<List<String>, List<double[]>> toMetric(Map<List<String>, List<Double>> result,int numDatapoints,int numFunctions) { + Map<List<String>, List<double[]>> timeseriesDatapoints = new HashMap<List<String>, List<double[]>>(); + /** + * bug fix: startTime is inclusive and endTime is exclusive + */ + // int numDatapoints = (int)((endTime-startTime)/intervalms + 1); + // int numDatapoints = (int)((endTime-1-startTime)/intervalms + 1); + for (Map.Entry<List<String>, List<Double>> entry : result.entrySet()) { + // get groups + List<String> groupbyFields = entry.getKey(); + List<String> copy = new ArrayList<String>(groupbyFields); + String strTimeseriesIndex = copy.remove(copy.size() - 1); + List<double[]> functionValues = timeseriesDatapoints.get(copy); + if (functionValues == null) { + functionValues = new ArrayList<double[]>(); + timeseriesDatapoints.put(copy, functionValues); + for (int i = 0; i < numFunctions; i++) { + functionValues.add(new double[numDatapoints]); + } + } + int timeseriesIndex = Integer.valueOf(strTimeseriesIndex); + int functionIndex = 0; + for (double[] values : functionValues) { + values[timeseriesIndex] = entry.getValue().get(functionIndex); + functionIndex++; + } + } + return timeseriesDatapoints; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java index d662658..78fa010 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesBucket.java @@ -26,51 +26,51 @@ import org.slf4j.LoggerFactory; * only numeric aggregation is supported and number type supported is double */ public class TimeSeriesBucket { - private final static Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class); - private long startTime; - private long endTime; - private long interval; - - // map of aggregation function to aggregated values - List<double[]> aggregatedValues = new ArrayList<double[]>(); - - // align from the startTime - /** - * - * @param startTime milliseconds - * @param endTime milliseconds - * @param intervalMillseconds - * @param aggFunctions - */ - public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions){ - int count =(int)((endTime-startTime)/intervalms); - for(int i=0; i<numAggFunctions; i++){ - aggregatedValues.add(new double[count]); - } - } - - /** - * add datapoint which has a list of values for different aggregate functions - * for example, sum(numHosts), count(*), avg(timespan) etc - * @param timestamp - * @param values - */ - public void addDataPoint(long timestamp, List<Double> values){ - // locate timeseries bucket - if(timestamp < startTime || timestamp > endTime){ - LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime); - return; - } - int located =(int)((timestamp - startTime)/interval); - int index = 0; - for(Double src : values){ - double[] timeSeriesValues = aggregatedValues.get(index); - timeSeriesValues[located] += src; - index++; - } - } - - public List<double[]> aggregatedValues(){ - return this.aggregatedValues; - } + private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesBucket.class); + private long startTime; + private long endTime; + private long interval; + + // map of aggregation function to aggregated values + List<double[]> aggregatedValues = new ArrayList<double[]>(); + + // align from the startTime + /** + * + * @param startTime milliseconds + * @param endTime milliseconds + * @param intervalMillseconds + * @param aggFunctions + */ + public TimeSeriesBucket(long startTime, long endTime, long intervalms, int numAggFunctions) { + int count = (int)((endTime - startTime) / intervalms); + for (int i = 0; i < numAggFunctions; i++) { + aggregatedValues.add(new double[count]); + } + } + + /** + * add datapoint which has a list of values for different aggregate functions + * for example, sum(numHosts), count(*), avg(timespan) etc + * @param timestamp + * @param values + */ + public void addDataPoint(long timestamp, List<Double> values) { + // locate timeseries bucket + if (timestamp < startTime || timestamp > endTime) { + LOG.warn("timestamp<startTime or timestamp>endTime, ignore this datapoint." + timestamp + "," + startTime + ":" + endTime); + return; + } + int located = (int)((timestamp - startTime) / interval); + int index = 0; + for (Double src : values) { + double[] timeSeriesValues = aggregatedValues.get(index); + timeSeriesValues[located] += src; + index++; + } + } + + public List<double[]> aggregatedValues() { + return this.aggregatedValues; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java index c0a6e06..ae00fdf 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/TimeSeriesPostFlatAggregateSort.java @@ -25,127 +25,127 @@ import java.util.SortedSet; import java.util.TreeSet; public class TimeSeriesPostFlatAggregateSort { - // private static final Logger logger = - // LoggerFactory.getLogger(PostFlatAggregateSort.class); - - private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue( - Map<List<String>, List<Double>> mapForSort, - List<SortOption> sortOptions) { - SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>( - new MapEntryComparator(sortOptions)); - sortedEntries.addAll(mapForSort.entrySet()); - return sortedEntries; - } - - /** - * sort aggregated results with sort options - * - * @param entity - */ - public static List<Map.Entry<List<String>, List<double[]>>> sort( - Map<List<String>, List<Double>> mapForSort, - Map<List<String>, List<double[]>> valueMap, - List<SortOption> sortOptions, int topN) { - - processIndex(sortOptions); - List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>(); - SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue( - mapForSort, sortOptions); - for (Map.Entry<List<String>, List<Double>> entry : sortedSet) { - List<String> key = entry.getKey(); - List<double[]> value = valueMap.get(key); - if (value != null) { - Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value); - result.add(newEntry); - if (topN > 0 && result.size() >= topN) { - break; - } - } - } - return result; - } - - private static void processIndex(List<SortOption> sortOptions) { - for (int i = 0; i < sortOptions.size(); ++i) { - SortOption so = sortOptions.get(i); - so.setIndex(i); - } - } - - private static class MapEntryComparator implements - Comparator<Map.Entry<List<String>, List<Double>>> { - private List<SortOption> sortOptions; - - public MapEntryComparator(List<SortOption> sortOptions) { - this.sortOptions = sortOptions; - } - - /** - * default to sort by all groupby fields - */ - @Override - public int compare(Map.Entry<List<String>, List<Double>> e1, - Map.Entry<List<String>, List<Double>> e2) { - int r = 0; - List<String> keyList1 = e1.getKey(); - List<Double> valueList1 = e1.getValue(); - List<String> keyList2 = e2.getKey(); - List<Double> valueList2 = e2.getValue(); - for (SortOption so : sortOptions) { - int index = so.getIndex(); - if (index == -1) { - continue; - } - if (!so.isInGroupby()) { // sort fields come from functions - Double value1 = valueList1.get(index); - Double value2 = valueList2.get(index); - r = value1.compareTo(value2); - } else { // sort fields come from groupby fields - String key1 = keyList1.get(index); - String key2 = keyList2.get(index); - r = key1.compareTo(key2); - } - if (r == 0) - continue; - if (!so.isAscendant()) { - r = -r; - } - return r; - } - // default to sort by groupby fields ascendently - if (r == 0) { // TODO is this check necessary - return new GroupbyFieldsComparator() - .compare(keyList1, keyList2); - } - return r; - } - } - - static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable { - private final K key; - private final V value; - - ImmutableEntry(K key, V value) { - this.key = key; - this.value = value; - } - - @Override - public K getKey() { - return key; - } - - @Override - public V getValue() { - return value; - } - - @Override - public final V setValue(V value) { - throw new UnsupportedOperationException(); - } - - private static final long serialVersionUID = 0; - } + // private static final Logger logger = + // LoggerFactory.getLogger(PostFlatAggregateSort.class); + + private static SortedSet<Map.Entry<List<String>, List<Double>>> sortByValue( + Map<List<String>, List<Double>> mapForSort, + List<SortOption> sortOptions) { + SortedSet<Map.Entry<List<String>, List<Double>>> sortedEntries = new TreeSet<Map.Entry<List<String>, List<Double>>>( + new MapEntryComparator(sortOptions)); + sortedEntries.addAll(mapForSort.entrySet()); + return sortedEntries; + } + + /** + * sort aggregated results with sort options + * + * @param entity + */ + public static List<Map.Entry<List<String>, List<double[]>>> sort( + Map<List<String>, List<Double>> mapForSort, + Map<List<String>, List<double[]>> valueMap, + List<SortOption> sortOptions, int topN) { + + processIndex(sortOptions); + List<Map.Entry<List<String>, List<double[]>>> result = new ArrayList<Map.Entry<List<String>, List<double[]>>>(); + SortedSet<Map.Entry<List<String>, List<Double>>> sortedSet = sortByValue( + mapForSort, sortOptions); + for (Map.Entry<List<String>, List<Double>> entry : sortedSet) { + List<String> key = entry.getKey(); + List<double[]> value = valueMap.get(key); + if (value != null) { + Map.Entry<List<String>, List<double[]>> newEntry = new ImmutableEntry<List<String>, List<double[]>>(key, value); + result.add(newEntry); + if (topN > 0 && result.size() >= topN) { + break; + } + } + } + return result; + } + + private static void processIndex(List<SortOption> sortOptions) { + for (int i = 0; i < sortOptions.size(); ++i) { + SortOption so = sortOptions.get(i); + so.setIndex(i); + } + } + + private static class MapEntryComparator implements Comparator<Map.Entry<List<String>, List<Double>>> { + private List<SortOption> sortOptions; + + public MapEntryComparator(List<SortOption> sortOptions) { + this.sortOptions = sortOptions; + } + + /** + * default to sort by all groupby fields + */ + @Override + public int compare(Map.Entry<List<String>, List<Double>> e1, + Map.Entry<List<String>, List<Double>> e2) { + int r = 0; + List<String> keyList1 = e1.getKey(); + List<Double> valueList1 = e1.getValue(); + List<String> keyList2 = e2.getKey(); + List<Double> valueList2 = e2.getValue(); + for (SortOption so : sortOptions) { + int index = so.getIndex(); + if (index == -1) { + continue; + } + if (!so.isInGroupby()) { // sort fields come from functions + Double value1 = valueList1.get(index); + Double value2 = valueList2.get(index); + r = value1.compareTo(value2); + } else { // sort fields come from groupby fields + String key1 = keyList1.get(index); + String key2 = keyList2.get(index); + r = key1.compareTo(key2); + } + if (r == 0) { + continue; + } + if (!so.isAscendant()) { + r = -r; + } + return r; + } + // default to sort by groupby fields ascendently + if (r == 0) { // TODO is this check necessary + return new GroupbyFieldsComparator() + .compare(keyList1, keyList2); + } + return r; + } + } + + static class ImmutableEntry<K, V> implements Map.Entry<K, V>, Serializable { + private final K key; + private final V value; + + ImmutableEntry(K key, V value) { + this.key = key; + this.value = value; + } + + @Override + public K getKey() { + return key; + } + + @Override + public V getValue() { + return value; + } + + @Override + public final V setValue(V value) { + throw new UnsupportedOperationException(); + } + + private static final long serialVersionUID = 0; + } }
