Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4380#discussion_r129837121
  
    --- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeIdentitySortProcessFunctionOffsetFetch.scala
 ---
    @@ -0,0 +1,110 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import org.apache.flink.api.common.state.{ ListState, ListStateDescriptor }
    +import org.apache.flink.api.java.typeutils.RowTypeInfo
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.runtime.state.{ FunctionInitializationContext, 
FunctionSnapshotContext }
    +import org.apache.flink.streaming.api.functions.ProcessFunction
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.{ Collector, Preconditions }
    +import org.apache.flink.api.common.state.ValueState
    +import org.apache.flink.api.common.state.ValueStateDescriptor
    +import scala.util.control.Breaks._
    +import org.apache.flink.api.java.tuple.{ Tuple2 => JTuple2 }
    +import org.apache.flink.api.common.state.MapState
    +import org.apache.flink.api.common.state.MapStateDescriptor
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.api.java.typeutils.ListTypeInfo
    +import java.util.Comparator
    +import java.util.ArrayList
    +import java.util.Collections
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import org.apache.flink.table.runtime.types.{CRow, CRowTypeInfo}
    +
    +/**
    + * Process Function used for the sort based solely on proctime with 
offset/fetch
    + * [[org.apache.flink.streaming.api.datastream.DataStream]]
    + *
    + * @param offset Is used to indicate the number of elements to be skipped 
in the current context
    + * (0 offset allows to execute only fetch)
    + * @param fetch Is used to indicate the number of elements to be outputted 
in the current context
    + * @param inputType It is used to mark the type of the incoming data
    + */
    +class ProcTimeIdentitySortProcessFunctionOffsetFetch(
    +  private val offset: Int,
    +  private val fetch: Int,
    +  private val inputRowType: CRowTypeInfo)
    +    extends ProcessFunction[CRow, CRow] {
    +
    +  private var stateEventsBuffer: ListState[Row] = _
    +  
    +  private var outputC: CRow = _
    +  private val adjustedFetchLimit = offset + fetch
    +  
    +  override def open(config: Configuration) {
    +    val sortDescriptor = new ListStateDescriptor[Row]("sortState",
    +        inputRowType.asInstanceOf[CRowTypeInfo].rowType)
    +    stateEventsBuffer = getRuntimeContext.getListState(sortDescriptor)
    +
    +    val arity:Integer = inputRowType.getArity
    +    if (outputC == null) {
    +      outputC = new CRow(Row.of(arity), true)
    +    }
    +    
    +  }
    +
    +  override def processElement(
    --- End diff --
    
    This operator should be implemented as follows:
    
    ```
    val fetchCnt = fetchCount.value
    if (fetch == -1 || fetchCnt < fetch) {
      // we haven't fetched enough rows
      val offsetCnt = offsetCount.value
      if (offsetCnt < offset) {
        // we haven't skipped enough rows
        // increment counter and skip row
        offsetCount.update(offsetCnt + 1)
      } else {
       // forward row
       out.collect(inputC)
       if (fetchCnt != -1) {
         fetchCount.update(fetchCnt + 1)
       }
      }
    } else {
      // we fetch enough rows. drop Row and return
    }
    ```
    
    As you'll notice `ORDER BY proctime ASC FETCH x ROWS FIRST` is quite 
pointless because it will only emit x rows than nothing more. However, that's 
the correct semantics here. `OFFSET` is similar because it won't emit the first 
x rows which is not really meaningful either in a streaming context.
    
    The other combinations are basically the same. The only difference is that 
they do a bit more sorting to identify the rows that have to be dropped. The 
sorting operators have to do the sorting as before in `onTimer()` but each 
record has to pass the offset, fetch check before being emitted. Once, the 
fetch count is exceeded, we also don't need to put rows into state and can 
simply drop them.
    The reason why we need counters for offset and fetch are that this 
information must not get lost in case of a failure. Otherwise a job might emit 
rows after a failure even though it emitted enough rows before the failure.
    
    Please adjust all operators and tests to the correct semantics. Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to