szlta commented on a change in pull request #538: HIVE-21217: Optimize range 
calculation for PTF
URL: https://github.com/apache/hive/pull/538#discussion_r258088436
 
 

 ##########
 File path: 
ql/src/java/org/apache/hadoop/hive/ql/udf/ptf/ValueBoundaryScanner.java
 ##########
 @@ -44,10 +49,207 @@ public ValueBoundaryScanner(BoundaryDef start, 
BoundaryDef end, boolean nullsLas
     this.nullsLast = nullsLast;
   }
 
+  public abstract Object computeValue(Object row) throws HiveException;
+
+  /**
+   * Checks if the distance of v2 to v1 is greater than the given amt.
+   * @return True if the value of v1 - v2 is greater than amt or either value 
is null.
+   */
+  public abstract boolean isDistanceGreater(Object v1, Object v2, int amt);
+
+  /**
+   * Checks if the values of v1 or v2 are the same.
+   * @return True if both values are the same or both are nulls.
+   */
+  public abstract boolean isEqual(Object v1, Object v2);
+
   public abstract int computeStart(int rowIdx, PTFPartition p) throws 
HiveException;
 
   public abstract int computeEnd(int rowIdx, PTFPartition p) throws 
HiveException;
 
+  /**
+   * Checks and maintains cache content - optimizes cache window to always be 
around current row
+   * thereby makes it follow the current progress.
+   * @param rowIdx current row
+   * @param p current partition for the PTF operator
+   * @throws HiveException
+   */
+  public void handleCache(int rowIdx, PTFPartition p) throws HiveException {
+    BoundaryCache cache = p.getBoundaryCache();
+    if (cache == null) {
+      return;
+    }
+
+    //Start of partition
+    if (rowIdx == 0) {
+      cache.clear();
+    }
+    if (cache.isComplete()) {
+      return;
+    }
+
+    int cachePos = cache.approxCachePositionOf(rowIdx);
+
+    if (cache.isEmpty()) {
+      fillCacheUntilEndOrFull(rowIdx, p);
+    } else if (cachePos > 50 && cachePos <= 75) {
+      if (!start.isPreceding() && end.isFollowing()) {
+        cache.evictHalf();
+        fillCacheUntilEndOrFull(rowIdx, p);
+      }
+    } else if (cachePos > 75 && cachePos <= 95) {
+      if (start.isPreceding() && end.isFollowing()) {
+        cache.evictHalf();
+        fillCacheUntilEndOrFull(rowIdx, p);
+      }
+    } else if (cachePos >= 95) {
+      if (start.isPreceding() && !end.isFollowing()) {
+        cache.evictHalf();
+        fillCacheUntilEndOrFull(rowIdx, p);
+      }
+
+    }
+  }
+
+  /**
+   * Inserts values into cache starting from rowIdx in the current partition 
p. Stops if cache
+   * reaches its maximum size or we get out of rows in p.
+   * @param rowIdx
+   * @param p
+   * @throws HiveException
+   */
+  private void fillCacheUntilEndOrFull(int rowIdx, PTFPartition p) throws 
HiveException {
+    BoundaryCache cache = p.getBoundaryCache();
+    if (cache == null || p.size() <= 0) {
+      return;
+    }
+
+    //If we continue building cache
+    Map.Entry<Integer, Object> ceilingEntry = cache.getMaxEntry();
+    if (ceilingEntry != null) {
+      rowIdx = ceilingEntry.getKey();
+    }
+
+    Object rowVal = null;
+    Object lastRowVal = null;
+
+    while (rowIdx < p.size()) {
+      rowVal = computeValue(p.getAt(rowIdx));
+      if (!isEqual(rowVal, lastRowVal)){
+        if (!cache.putIfNotFull(rowIdx, rowVal)){
 
 Review comment:
   Agree, fixed!

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to