Github user jihoonson commented on a diff in the pull request:
https://github.com/apache/tajo/pull/454#discussion_r28643064
--- Diff:
tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/WindowAggExec.java
---
@@ -347,6 +350,372 @@ private void finalizeWindow() {
}
}
+
+ private void evaluateNonframableWindowFunction(int functionIndex) {
+ for (int i = 0; i < accumulatedInTuples.size(); i++) {
+ Tuple inTuple = accumulatedInTuples.get(i);
+ Tuple outTuple = evaluatedTuples.get(i);
+
+ functions[functionIndex].merge(contexts[functionIndex], inTuple);
+
+ if (windowFuncFlags[functionIndex]) {
+ Datum result =
functions[functionIndex].terminate(contexts[functionIndex]);
+ outTuple.put(nonFunctionColumnNum + functionIndex, result);
+ }
+ }
+
+ if (aggFuncFlags[functionIndex]) {
+ for (int i = 0; i < evaluatedTuples.size(); i++) {
+ Datum result =
functions[functionIndex].terminate(contexts[functionIndex]);
+ Tuple outTuple = evaluatedTuples.get(i);
+ outTuple.put(nonFunctionColumnNum + functionIndex, result);
+ }
+ }
+ }
+
+ private void evaluateFramableWindowFunction(int functionIndex,
LogicalWindowSpec.LogicalWindowFrame windowFrame,
+ BaseTupleComparator comp) {
+ LogicalWindowSpec.LogicalWindowFrame.WindowFrameType windowFrameType =
windowFrame.getFrameType();
+ WindowSpec.WindowFrameUnit windowFrameUnit =
windowFrame.getFrameUnit();
+ int windowFrameStartOffset = windowFrame.getStartBound().getNumber();
+ int windowFrameEndOffset = windowFrame.getEndBound().getNumber();
+
+ String funcName = functions[functionIndex].getName();
+ int sameStartRange = 0; int sameEndRange = 0;
+ int frameStart = 0, frameEnd = accumulatedInTuples.size() - 1;
+ int actualStart = 0;
+
+ for (int i = 0; i < accumulatedInTuples.size(); i++) {
+ switch(windowFrameType) {
+ case TO_CURRENT_ROW:
+ frameEnd = i + windowFrameEndOffset; break;
+ case FROM_CURRENT_ROW:
+ frameStart = i + windowFrameStartOffset; break;
+ case SLIDING_WINDOW:
+ frameStart = i + windowFrameStartOffset;
+ frameEnd = i + windowFrameEndOffset;
+ break;
+ }
+
+ // RANGE window frame SHOULD include all the rows that has the same
order by value with the current row
+ // if comp == null, there is no order by and window frame is set
as the entire partition
+ if (comp != null && windowFrameUnit ==
WindowSpec.WindowFrameUnit.RANGE) {
+ // move frame end point to the last row of the same order by value
+ if (sameEndRange == 0) {
+ sameEndRange = numOfTuplesWithTheSameKeyValue(frameEnd, comp,
true);
+ }
+ sameEndRange --;
+ frameEnd += sameEndRange;
+
+ // move frame start point to the first row of the same order by
value
+ if (sameStartRange == 0) {
+ sameStartRange = numOfTuplesWithTheSameKeyValue(frameStart,
comp, true);
+ actualStart = frameStart;
+ }
+ sameStartRange --;
+ frameStart = actualStart;
+ }
+ // As the number of built-in window functions that support window
frame is small,
+ // special treatment for each function seems to be reasonable
+ Tuple inTuple = getFunctionSpecificInput(funcName, frameStart,
frameEnd);
+ Datum result = NullDatum.get();
+
+ if (inTuple != null) {
+ functions[functionIndex].merge(contexts[functionIndex], inTuple);
+ result =
functions[functionIndex].terminate(contexts[functionIndex]);
+ }
+ Tuple outTuple = evaluatedTuples.get(i);
+ outTuple.put(nonFunctionColumnNum + functionIndex, result);
+ }
+ }
+
+ // it returns the number of Tuples with the same order by key value
+ // if only one Tuple for the given order by key exists, it returns 1
+ private int numOfTuplesWithTheSameKeyValue(int startOffset,
BaseTupleComparator comp, boolean isForward) {
+ int numberOfTuples = 0;
+
+ Tuple inTuple = accumulatedInTuples.get(startOffset);
+ if (isForward) {
+ do {
+ numberOfTuples++;
+ startOffset++;
+ } while (startOffset < accumulatedInTuples.size() &&
comp.compare(accumulatedInTuples.get(startOffset), inTuple) == 0);
+ } else { // backward direction
+ do {
+ numberOfTuples++;
+ startOffset--;
+ } while (startOffset >= 0 &&
comp.compare(accumulatedInTuples.get(startOffset), inTuple) == 0);
+ }
+
+ return numberOfTuples;
+ }
+
+ private Tuple getFunctionSpecificInput(String funcName, int dataStart,
int dataEnd) {
+ if (funcName.equals("first_value")) {
+ // check the frame start is within the partition
+ if (dataStart <= dataEnd && dataStart >= 0 && dataStart <
accumulatedInTuples.size()) {
+ return accumulatedInTuples.get(dataStart);
+ }
+ } else if (funcName.equals("last_value")) {
+ // check the frame end is within the partition
+ if (dataStart <= dataEnd && dataEnd >= 0 && dataEnd <
accumulatedInTuples.size()) {
+ return accumulatedInTuples.get(dataEnd);
+ }
+ }
+ return null;
+ }
+
+ private void evaluateAggregationFunction(int functionIndex,
LogicalWindowSpec.LogicalWindowFrame windowFrame,
--- End diff --
This function looks still complicated. Would you mind dividing it into
several small functions?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---