jihoonson commented on a change in pull request #7290: Moving average query pr2
URL: https://github.com/apache/incubator-druid/pull/7290#discussion_r266669556
 
 

 ##########
 File path: 
extensions-contrib/moving-average-query/src/main/java/org/apache/druid/query/movingaverage/MovingAverageIterable.java
 ##########
 @@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.movingaverage;
+
+import org.apache.druid.data.input.MapBasedRow;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Yielder;
+import org.apache.druid.query.aggregation.Aggregator;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.PostAggregator;
+import org.apache.druid.query.dimension.DimensionSpec;
+import org.apache.druid.query.movingaverage.averagers.Averager;
+import org.apache.druid.query.movingaverage.averagers.AveragerFactory;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.DimensionSelector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * {@link MovingAverageIterable} iterates over days {@link RowBucket}, 
producing rows for each dimension combination,
+ * filling in missing entries with "empty" rows so that the averaging buckets 
have enough data to operate on.
+ * It then computes the moving average on the buckets and returns the row.
+ * See {@link MovingAverageIterator#computeMovingAverage(Map, Row, boolean)} 
for more details.
+ */
+public class MovingAverageIterable implements Iterable<Row>
+{
+
+  private final Sequence<RowBucket> seq;
+  private final Collection<DimensionSpec> dims;
+  private final Collection<AveragerFactory<?, ?>> factories;
+  private final Map<String, PostAggregator> postAggMap;
+  private final Map<String, AggregatorFactory> aggMap;
+  private final Map<String, Object> fakeEvents;
+
+  public MovingAverageIterable(
+      Sequence<RowBucket> buckets,
+      Collection<DimensionSpec> dims,
+      Collection<AveragerFactory<?, ?>> factories,
+      List<PostAggregator> postAggList,
+      List<AggregatorFactory> aggList
+  )
+  {
+    this.dims = dims;
+    this.factories = factories;
+    this.seq = buckets;
+
+    postAggMap = postAggList.stream().collect(Collectors.toMap(postAgg -> 
postAgg.getName(), postAgg -> postAgg));
+    aggMap = aggList.stream().collect(Collectors.toMap(agg -> agg.getName(), 
agg -> agg));
+    fakeEvents = generateFakeEventsFromAggregators(aggMap, postAggMap);
+  }
+
+  // Build a list of dummy events from Aggregators/PostAggregators to be used 
by Iterator to build fake rows.
+  // These fake rows will be used by computeMovingAverage() in skip=true mode.
+  // See fakeEventsCopy in internalNext() and computeMovingAverage() 
documentation.
+  private Map<String, Object> generateFakeEventsFromAggregators(Map<String, 
AggregatorFactory> aggMap,
+                                                                Map<String, 
PostAggregator> postAggMap)
+  {
+    Map<String, Object> fakeEvents = new LinkedHashMap<>();
+    aggMap.values().forEach(agg -> {
+      Aggregator aggFactorized = 
agg.factorize(getEmptyColumnSelectorFactory());
+      fakeEvents.put(agg.getName(), aggFactorized.get());
+    });
+    postAggMap.values().forEach(postAgg -> fakeEvents.put(postAgg.getName(), 
postAgg.compute(fakeEvents)));
+    return fakeEvents;
+  }
+
+  @Nonnull
+  private ColumnSelectorFactory getEmptyColumnSelectorFactory()
+  {
+    return new ColumnSelectorFactory()
+    {
+      @Override
+      public DimensionSelector makeDimensionSelector(DimensionSpec 
dimensionSpec)
+      {
+        // Generating empty records while aggregating on Filtered aggregators 
requires a dimension selector
+        // for initialization.  This dimension selector is not actually used 
for generating values
+        return DimensionSelector.constant(null);
+      }
+
+      @Override
+      public ColumnValueSelector makeColumnValueSelector(String s)
+      {
+        return null;
 
 Review comment:
   I guess this should be `NilColumnValueSelector`. Otherwise, this will throw 
NPE if any aggregator needs `ColumnVauleSelector` like in 
`HyperUniquesAggregatorFactory`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to