LakshSingla commented on code in PR #16230:
URL: https://github.com/apache/druid/pull/16230#discussion_r1580515287


##########
processing/src/main/java/org/apache/druid/query/aggregation/firstlast/FirstLastVectorAggregator.java:
##########
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.aggregation.firstlast;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.collections.SerializablePair;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.aggregation.VectorAggregator;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+import org.apache.druid.segment.vector.VectorValueSelector;
+
+import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
+
+/**
+ * Base type for vectorized version of on heap 'last' aggregator for primitive 
numeric column selectors..
+ */
+public abstract class FirstLastVectorAggregator<RhsType, PairType extends 
SerializablePair<Long, RhsType>>
+    implements VectorAggregator
+{
+  public static final int NULLITY_OFFSET = Long.BYTES;
+  public static final int VALUE_OFFSET = NULLITY_OFFSET + Byte.BYTES;
+
+  @Nullable
+  private final VectorValueSelector timeSelector;
+  @Nullable
+  private final VectorValueSelector valueSelector;
+  @Nullable
+  private final VectorObjectSelector objectSelector;
+  private final SelectionPredicate selectionPredicate;
+  private final boolean useDefault = NullHandling.replaceWithDefault();
+
+
+  /**
+   * TODO(laksh): valueSelector isn't used much here, only checked for 
nullity. However while calling the methods of the subclasses,
+   *  it gets used because it is clearer to know which selector is getting 
used. This gets used
+   *
+   *  timeSelector can be null, however aggregate functions are no-op then. 
Weird, since all numeric versions supply the selector,
+   *  only the case when the aggregator's capabilities are not present in the 
string version do we hit this case (not sure why this is
+   *  a possibility, and what benefit does it provides)
+   */
+  public FirstLastVectorAggregator(
+      @Nullable VectorValueSelector timeSelector,
+      @Nullable VectorValueSelector valueSelector,
+      @Nullable VectorObjectSelector objectSelector,
+      SelectionPredicate selectionPredicate
+  )
+  {
+    if (timeSelector != null) {
+      Preconditions.checkArgument(
+          (valueSelector != null && objectSelector == null) || (valueSelector 
== null && objectSelector != null),
+          "exactly one of 'valueSelector' and 'objectSelector' must be 
provided"
+      );
+    }
+    this.timeSelector = timeSelector;
+    this.valueSelector = valueSelector;
+    this.objectSelector = objectSelector;
+    this.selectionPredicate = selectionPredicate;
+  }
+
+  @Override
+  public void aggregate(ByteBuffer buf, int position, int startRow, int endRow)
+  {
+    // Not a normal case, and this doesn't affect the folding. timeSelectors 
should be present (albeit irrelevent) when folding.
+    // timeSelector == null means that the aggregating column's capabilities 
aren't known, and it only happens for a special case
+    // while building string aggregator
+    if (timeSelector == null) {
+      return;
+    }
+
+    // If objectSelector isn't null, then the objects might be folded up. If 
that's the case, whatever's represented by
+    // the timeSelector doesn't hold any relevance.
+    if (objectSelector != null) {
+      final Object[] maybeFoldedObjects = objectSelector.getObjectVector();
+      final boolean[] timeNullityVector = timeSelector.getNullVector();
+      final long[] timeVector = timeSelector.getLongVector();
+
+      PairType selectedPair = null;
+
+      for (int index = startRow; index < endRow; ++index) {
+
+        PairType pair = readPairFromVectorSelectors(timeNullityVector, 
timeVector, maybeFoldedObjects, index);
+        if (pair != null) {
+          if (selectedPair == null) {
+            selectedPair = pair;
+          } else if (selectionPredicate.apply(pair.lhs, selectedPair.lhs)) {
+            selectedPair = pair;
+          }
+        }
+      }
+      // Something's been selected of the row vector
+      if (selectedPair != null) {
+        // Compare the latest value of the folded up row vector to the latest 
value in the buffer
+        if (selectionPredicate.apply(selectedPair.lhs, buf.getLong(position))) 
{
+          if (selectedPair.rhs != null) {
+            putValue(buf, position, selectedPair.lhs, selectedPair.rhs);
+          } else if (useDefault) {
+            putDefaultValue(buf, position, selectedPair.lhs);
+          } else {
+            putNull(buf, position, selectedPair.lhs);
+          }
+        }
+      }
+
+    } else {
+      // No object selector, no folding present. Check the timeSelector before 
checking the valueSelector
+      final boolean[] timeNullityVector = timeSelector.getNullVector();
+      final long[] timeVector = timeSelector.getLongVector();
+      final boolean[] valueNullityVector = valueSelector.getNullVector();
+      Integer selectedIndex = null;
+
+      for (int index = startRow; index < endRow; ++index) {
+        if (timeNullityVector != null && timeNullityVector[index]) {
+          // Don't aggregate values where time isn't present
+          continue;
+        }
+        // Find the latest time inside the vector objects
+        if (selectedIndex == null) {
+          selectedIndex = index;
+        } else {
+          if (selectionPredicate.apply(timeVector[index], 
timeVector[selectedIndex])) {
+            selectedIndex = index;
+          }
+        }
+      }
+      // Compare the selectedIndex's value to the value on the buffer. This 
way, we write to the buffer only once
+      // Weeds out empty vectors, where endRow == startRow
+      if (selectedIndex != null) {

Review Comment:
   Yes, in that case, we'd wanna leave the buffer as is, as the buffer may 
contain some precomputed first/last value of other batches of rows.
   If all the time values are null, then we don't have to worry, the serialized 
pair would be {-DateTimes.MAX, 0}, which is fine. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to