bvarghese1 commented on code in PR #25753: URL: https://github.com/apache/flink/pull/25753#discussion_r1942258579
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/over/NonTimeRangeUnboundedPrecedingFunction.java: ########## @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.over; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.ListTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.functions.KeyedProcessFunction; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.table.runtime.dataview.PerKeyStateDataViewStore; +import org.apache.flink.table.runtime.functions.KeyedProcessFunctionWithCleanupState; +import org.apache.flink.table.runtime.generated.AggsHandleFunction; +import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; +import org.apache.flink.table.runtime.generated.RecordEqualiser; +import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** A basic implementation to support non-time range unbounded over aggregate with retract mode. */ +public class NonTimeRangeUnboundedPrecedingFunction<K> + extends KeyedProcessFunctionWithCleanupState<K, RowData, RowData> { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(NonTimeRangeUnboundedPrecedingFunction.class); + + private final GeneratedAggsHandleFunction generatedAggsHandler; + private final GeneratedRecordEqualiser generatedRecordEqualiser; + private final GeneratedRecordEqualiser generatedSortKeyEqualiser; + private final GeneratedRecordComparator generatedRecordComparator; + + // The util to compare two rows based on the sort attribute. + private transient Comparator<RowData> sortKeyComparator; + + protected final KeySelector<RowData, RowData> sortKeySelector; + // The record equaliser used to equal RowData. + private transient RecordEqualiser valueEqualiser; + private transient RecordEqualiser sortKeyEqualiser; + + private final LogicalType[] accTypes; + private final LogicalType[] inputFieldTypes; + private final LogicalType[] sortKeyTypes; + protected transient JoinedRowData output; + + // state to hold the Long ID counter + private transient ValueState<Long> idState; + + // state to hold a sorted list each containing a tuple of sort key with an artificial id + // The artificial id acts as the key in the valueMapState + private transient ValueState<List<Tuple2<RowData, List<Long>>>> sortedListState; + // state to hold rows until state ttl expires + private transient MapState<Long, RowData> valueMapState; + // state to hold sortKey and its associated accumulator + private transient MapState<RowData, RowData> accMapState; + + protected transient AggsHandleFunction currFunction; + + public NonTimeRangeUnboundedPrecedingFunction( + long minRetentionTime, + long maxRetentionTime, + GeneratedAggsHandleFunction genAggsHandler, + GeneratedRecordEqualiser genRecordEqualiser, + GeneratedRecordEqualiser genSortKeyEqualiser, + GeneratedRecordComparator genRecordComparator, + LogicalType[] accTypes, + LogicalType[] inputFieldTypes, + LogicalType[] sortKeyTypes, + RowDataKeySelector sortKeySelector) { + super(minRetentionTime, maxRetentionTime); + this.generatedAggsHandler = genAggsHandler; + this.generatedRecordEqualiser = genRecordEqualiser; + this.generatedSortKeyEqualiser = genSortKeyEqualiser; + this.generatedRecordComparator = genRecordComparator; + this.accTypes = accTypes; + this.inputFieldTypes = inputFieldTypes; + this.sortKeyTypes = sortKeyTypes; + this.sortKeySelector = sortKeySelector; + } + + @Override + public void open(OpenContext openContext) throws Exception { + // Initialize agg functions + currFunction = + generatedAggsHandler.newInstance(getRuntimeContext().getUserCodeClassLoader()); + currFunction.open(new PerKeyStateDataViewStore(getRuntimeContext())); + + // Initialize output record + output = new JoinedRowData(); + + // Initialize value/row equaliser + valueEqualiser = + generatedRecordEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader()); + + // Initialize sortKey equaliser + sortKeyEqualiser = + generatedSortKeyEqualiser.newInstance(getRuntimeContext().getUserCodeClassLoader()); + + // Initialize sort comparator + sortKeyComparator = + generatedRecordComparator.newInstance(getRuntimeContext().getUserCodeClassLoader()); + + // Initialize state to maintain id counter + ValueStateDescriptor<Long> idStateDescriptor = + new ValueStateDescriptor<Long>("idState", Long.class); + idState = getRuntimeContext().getState(idStateDescriptor); + + // Input elements are all binary rows as they came from network + InternalTypeInfo<RowData> inputRowTypeInfo = InternalTypeInfo.ofFields(inputFieldTypes); + InternalTypeInfo<RowData> sortKeyRowTypeInfo = InternalTypeInfo.ofFields(this.sortKeyTypes); + + // Initialize state which maintains a sorted list of tuples(sortKey, List of IDs) + ListTypeInfo<Long> idListTypeInfo = new ListTypeInfo<Long>(Types.LONG); + ListTypeInfo<Tuple2<RowData, List<Long>>> listTypeInfo = + new ListTypeInfo<>(new TupleTypeInfo<>(sortKeyRowTypeInfo, idListTypeInfo)); + ValueStateDescriptor<List<Tuple2<RowData, List<Long>>>> sortedListStateDescriptor = + new ValueStateDescriptor<List<Tuple2<RowData, List<Long>>>>( + "sortedListState", listTypeInfo); + sortedListState = getRuntimeContext().getState(sortedListStateDescriptor); + + // Initialize state which maintains the actual row + MapStateDescriptor<Long, RowData> valueStateDescriptor = + new MapStateDescriptor<Long, RowData>( + "valueMapState", Types.LONG, inputRowTypeInfo); + valueMapState = getRuntimeContext().getMapState(valueStateDescriptor); + + // Initialize accumulator state per row + InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(accTypes); + MapStateDescriptor<RowData, RowData> accStateDescriptor = + new MapStateDescriptor<RowData, RowData>( + "accMapState", sortKeyRowTypeInfo, accTypeInfo); + accMapState = getRuntimeContext().getMapState(accStateDescriptor); + + initCleanupTimeState("NonTimeUnboundedPrecedingFunctionCleanupTime"); + } + + /** + * Puts an element from the input stream into state. Emits the aggregated value for the newly + * inserted element. For append stream emits updates(UB, UA) for all elements which are present + * after the newly inserted element. For retract stream emits a DELETE for the element and + * thereafter emits updates(UB, UA) for all elements which are present after the retracted + * element. Emits the same aggregated value for all elements with the same sortKey to comply + * with the sql RANGE syntax. + * + * @param input The input value. + * @param ctx A {@link Context} that allows querying the timestamp of the element and getting + * TimerService for registering timers and querying the time. The context is only valid + * during the invocation of this method, do not store it. + * @param out The collector for returning result values. + * @throws Exception + */ + @Override + public void processElement( + RowData input, + KeyedProcessFunction<K, RowData, RowData>.Context ctx, + Collector<RowData> out) + throws Exception { + // register state-cleanup timer + registerProcessingCleanupTimer(ctx, ctx.timerService().currentProcessingTime()); + + RowKind rowKind = input.getRowKind(); + + switch (rowKind) { + case INSERT: + case UPDATE_AFTER: + insertIntoSortedList(input, out); + break; + + case DELETE: + case UPDATE_BEFORE: + removeFromSortedList(input, out); + break; + } + + // Reset acc state since we can have out of order inserts into the ordered list + currFunction.resetAccumulators(); + currFunction.cleanup(); + } + + /** + * Adds a new element(input) to a sortedList. The sortedList contains a list of Tuple<SortKey, + * List<Long Ids>> Extracts the inputSortKey from the input and compares it with every element + * in the sortedList If an sortKey already exists in the sortedList for the input, add the id to + * the list of ids and update the sortedList Otherwise find the right position in the sortedList + * and add a new entry in the middle. If no matching sortKey is found, add a new entry in the + * end of the sortedList. Review Comment: Fixed javadoc -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
