danny0405 commented on a change in pull request #1761:
URL: https://github.com/apache/calcite/pull/1761#discussion_r411103203
##########
File path:
core/src/main/java/org/apache/calcite/adapter/enumerable/EnumUtils.java
##########
@@ -796,4 +803,244 @@ static Expression windowSelector(
outputPhysType.record(expressions),
parameter);
}
+
+ /**
+ * Create enumerable implementation that applies sessionization to elements
from the input
+ * enumerator based on a specified key. Elements are windowed into sessions
separated by
+ * periods with no input for at least the duration specified by gap
parameter.
+ */
+ public static Enumerable<Object[]> sessionize(Enumerator<Object[]>
inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ return new AbstractEnumerable<Object[]>() {
+ @Override public Enumerator<Object[]> enumerator() {
+ return new SessionizationEnumerator(inputEnumerator,
+ indexOfWatermarkedColumn, indexOfKeyColumn, gap);
+ }
+ };
+ }
+
+ private static class SessionizationEnumerator implements
Enumerator<Object[]> {
+ private final Enumerator<Object[]> inputEnumerator;
+ private final int indexOfWatermarkedColumn;
+ private final int indexOfKeyColumn;
+ private final long gap;
+ private LinkedList<Object[]> list;
+ private boolean initialized;
+
+ SessionizationEnumerator(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, int indexOfKeyColumn, long gap) {
+ this.inputEnumerator = inputEnumerator;
+ this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+ this.indexOfKeyColumn = indexOfKeyColumn;
+ this.gap = gap;
+ list = new LinkedList<>();
+ initialized = false;
+ }
+
+ @Override public Object[] current() {
+ if (!initialized) {
+ initialize();
+ initialized = true;
+ }
+ return list.pollFirst();
+ }
+
+ @Override public boolean moveNext() {
+ return initialized ? list.size() > 0 : inputEnumerator.moveNext();
+ }
+
+ @Override public void reset() {
+ list.clear();
+ inputEnumerator.reset();
+ initialized = false;
+ }
+
+ @Override public void close() {
+ list.clear();
+ inputEnumerator.close();
+ initialized = false;
+ }
+
+ private void initialize() {
+ List<Object[]> elements = new ArrayList<>();
+ // initialize() will be called when inputEnumerator.moveNext() is true,
+ // thus firstly should take the current element.
+ elements.add(inputEnumerator.current());
+ // sessionization needs to see all data.
+ while (inputEnumerator.moveNext()) {
+ elements.add(inputEnumerator.current());
+ }
+
+ Map<Object, SortedMultiMap<Pair<Long, Long>, Object[]>> sessionKeyMap =
new HashMap<>();
+ for (Object[] element : elements) {
+ sessionKeyMap.putIfAbsent(element[indexOfKeyColumn], new
SortedMultiMap<>());
+ Pair initWindow = computeInitWindow(
+ SqlFunctions.toLong(element[indexOfWatermarkedColumn]), gap);
+ sessionKeyMap.get(element[indexOfKeyColumn]).putMulti(initWindow,
element);
+ }
+
+ // merge per key session windows if there is any overlap between windows.
+ for (Map.Entry<Object, SortedMultiMap<Pair<Long, Long>, Object[]>>
perKeyEntry
+ : sessionKeyMap.entrySet()) {
+ Map<Pair<Long, Long>, List<Object[]>> finalWindowElementsMap = new
HashMap<>();
+ Pair<Long, Long> currentWindow = null;
+ List<Object[]> tempElementList = new ArrayList<>();
+ for (Map.Entry<Pair<Long, Long>, List<Object[]>> sessionEntry
+ : perKeyEntry.getValue().entrySet()) {
+ // check the next window can be merged.
+ if (currentWindow == null || !isOverlapped(currentWindow,
sessionEntry.getKey())) {
+ // cannot merge window as there is no overlap
+ if (currentWindow != null) {
+ finalWindowElementsMap.put(currentWindow, new
ArrayList<>(tempElementList));
+ }
+
+ currentWindow = sessionEntry.getKey();
+ tempElementList.clear();
+ tempElementList.addAll(sessionEntry.getValue());
+ } else {
+ // merge windows.
+ currentWindow = mergeWindows(currentWindow, sessionEntry.getKey());
+ // merge elements in windows.
+ tempElementList.addAll(sessionEntry.getValue());
+ }
+ }
+
+ if (!tempElementList.isEmpty()) {
+ finalWindowElementsMap.put(currentWindow, new
ArrayList<>(tempElementList));
+ }
+
+ // construct final results from finalWindowElementsMap.
+ for (Map.Entry<Pair<Long, Long>, List<Object[]>>
finalWindowElementsEntry
+ : finalWindowElementsMap.entrySet()) {
+ for (Object[] element : finalWindowElementsEntry.getValue()) {
+ Object[] curWithWindow = new Object[element.length + 2];
+ System.arraycopy(element, 0, curWithWindow, 0, element.length);
+ curWithWindow[element.length] =
finalWindowElementsEntry.getKey().left;
+ curWithWindow[element.length + 1] =
finalWindowElementsEntry.getKey().right;
+ list.offer(curWithWindow);
+ }
+ }
+ }
+ }
+
+ private boolean isOverlapped(Pair<Long, Long> a, Pair<Long, Long> b) {
+ return !(b.left >= a.right);
+ }
+
+ private Pair<Long, Long> mergeWindows(Pair<Long, Long> a, Pair<Long, Long>
b) {
+ return new Pair<>(a.left <= b.left ? a.left : b.left, a.right >= b.right
? a.right : b.right);
+ }
+
+ private Pair<Long, Long> computeInitWindow(long ts, long gap) {
+ return new Pair<>(ts, ts + gap);
+ }
+ }
+
+ /**
+ * Create enumerable implementation that applies hopping on each element
from the input
+ * enumerator and produces at least one element for each input element.
+ */
+ public static Enumerable<Object[]> hopping(Enumerator<Object[]>
inputEnumerator,
+ int indexOfWatermarkedColumn, long emitFrequency, long intervalSize) {
+ return new AbstractEnumerable<Object[]>() {
+ @Override public Enumerator<Object[]> enumerator() {
+ return new HopEnumerator(inputEnumerator,
+ indexOfWatermarkedColumn, emitFrequency, intervalSize);
+ }
+ };
+ }
+
+ private static class HopEnumerator implements Enumerator<Object[]> {
+ private final Enumerator<Object[]> inputEnumerator;
+ private final int indexOfWatermarkedColumn;
+ private final long emitFrequency;
+ private final long intervalSize;
+ private LinkedList<Object[]> list;
+
+ HopEnumerator(Enumerator<Object[]> inputEnumerator,
+ int indexOfWatermarkedColumn, long emitFrequency, long intervalSize) {
+ this.inputEnumerator = inputEnumerator;
+ this.indexOfWatermarkedColumn = indexOfWatermarkedColumn;
+ this.emitFrequency = emitFrequency;
Review comment:
Use "table function" sounds better, i don't really know what a
"table-valued function" means.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]