[ https://issues.apache.org/jira/browse/FLINK-6968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16486827#comment-16486827 ]
ASF GitHub Bot commented on FLINK-6968: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190137656 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sinks/QueryableTableSink.scala --- @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.sinks + +import java.lang.{Boolean => JBool} + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.functions.KeySelector +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.{ResultTypeQueryable, RowTypeInfo} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.{TimeCharacteristic, TimeDomain} +import org.apache.flink.table.api.StreamQueryConfig +import org.apache.flink.table.runtime.aggregate.KeyedProcessFunctionWithCleanupState +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +/** + * A QueryableTableSink stores table in queryable state. + * + * This class stores table in queryable state so that users can access table data without + * dependency on external storage. + * + * Since this is only a kv storage, currently user can only do point query against it. + * + * Example: + * {{{ + * val env = ExecutionEnvironment.getExecutionEnvironment + * val tEnv = TableEnvironment.getTableEnvironment(env) + * + * val table: Table = ... + * + * val queryableTableSink: QueryableTableSink = new QueryableTableSink( + * "prefix", + * queryConfig, + * None) + * + * tEnv.writeToSink(table, queryableTableSink, config) + * }}} + * + * When program starts to run, user can access state with QueryableStateClient. + * {{{ + * val client = new QueryableStateClient(tmHostname, proxyPort) + * val data = client.getKvState( + * jobId, + * "prefix-column1", + * Row.of(1), + * new RowTypeInfo(Array(TypeInfoformation.of(classOf[Int]), Array("id")) + * stateDescriptor) + * .get(); + * + * }}} + * + * + * @param namePrefix + * @param queryConfig + * @param cleanupTimeDomain + */ +class QueryableTableSink( + private val namePrefix: String, + private val queryConfig: StreamQueryConfig, + private val cleanupTimeDomain: Option[TimeDomain]) + extends UpsertStreamTableSink[Row] + with TableSinkBase[JTuple2[JBool, Row]] { + private var keys: Array[String] = _ + + override def setKeyFields(keys: Array[String]): Unit = { + if (keys == null) { + throw new IllegalArgumentException("keys can't be null!") + } + this.keys = keys + } + + override def setIsAppendOnly(isAppendOnly: JBool): Unit = { + if (isAppendOnly) { + throw new IllegalArgumentException("A QueryableTableSink can not be used with append-only " + + "tables as the table would grow infinitely") + } + } + + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) + + override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]): Unit = { + val keyIndices = keys.map(getFieldNames.indexOf(_)) + val keyTypes = keyIndices.map(getFieldTypes(_)) + + val keySelectorType = new RowTypeInfo(keyTypes, keys) + + val processFunction = new QueryableStateProcessFunction( + namePrefix, + queryConfig, + keys, + getFieldNames, + getFieldTypes, + calculateCleanupTimeDomain(dataStream.getExecutionEnvironment.getStreamTimeCharacteristic)) + + dataStream.keyBy(new RowKeySelector(keyIndices, keySelectorType)) + .process(processFunction) + } + + private def calculateCleanupTimeDomain(timeCharacteristic: TimeCharacteristic): TimeDomain = { + val timeDomainFromTimeCharacteristic = { + timeCharacteristic match { + case TimeCharacteristic.IngestionTime | TimeCharacteristic.ProcessingTime => + TimeDomain.PROCESSING_TIME + case TimeCharacteristic.EventTime => + TimeDomain.EVENT_TIME + } + } + + cleanupTimeDomain.getOrElse(timeDomainFromTimeCharacteristic) + } + + override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = { + new QueryableTableSink(this.namePrefix, this.queryConfig, this.cleanupTimeDomain) + } +} + +class RowKeySelector( + private val keyIndices: Array[Int], + @transient private val returnType: TypeInformation[Row]) + extends KeySelector[JTuple2[JBool, Row], Row] + with ResultTypeQueryable[Row] { + + override def getKey(value: JTuple2[JBool, Row]): Row = { + val keys = keyIndices + + val srcRow = value.f1 + + val destRow = new Row(keys.length) + var i = 0 + while (i < keys.length) { + destRow.setField(i, srcRow.getField(keys(i))) + i += 1 + } + + destRow + } + + override def getProducedType: TypeInformation[Row] = returnType +} + +class QueryableStateProcessFunction( --- End diff -- Separate class? > Store streaming, updating tables with unique key in queryable state > ------------------------------------------------------------------- > > Key: FLINK-6968 > URL: https://issues.apache.org/jira/browse/FLINK-6968 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Renjie Liu > Priority: Major > > Streaming tables with unique key are continuously updated. For example > queries with a non-windowed aggregation generate such tables. Commonly, such > updating tables are emitted via an upsert table sink to an external datastore > (k-v store, database) to make it accessible to applications. > This issue is about adding a feature to store and maintain such a table as > queryable state in Flink. By storing the table in Flnk's queryable state, we > do not need an external data store to access the results of the query but can > query the results directly from Flink. -- This message was sent by Atlassian JIRA (v7.6.3#76005)