This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6333fc7 [FLINK-11975][table-planner-blink] Support running a simple
select from values query (#8035)
6333fc7 is described below
commit 6333fc734f730e64c5f96390f345e7e0b1352662
Author: Jing Zhang <[email protected]>
AuthorDate: Tue Mar 26 20:58:09 2019 +0800
[FLINK-11975][table-planner-blink] Support running a simple select from
values query (#8035)
---
.../flink/table/api/BatchTableEnvironment.scala | 66 +++++++
.../flink/table/api/StreamTableEnvironment.scala | 111 +++++++++++-
.../apache/flink/table/api/TableEnvironment.scala | 27 +++
.../org/apache/flink/table/api/TableImpl.scala | 19 +-
.../scala/org/apache/flink/table/api/Types.scala | 200 +++++++++++++++++++++
.../table/api/scala/StreamTableEnvironment.scala | 47 +++++
.../flink/table/api/scala/TableConversions.scala | 100 +++++++++++
.../org/apache/flink/table/api/scala/package.scala | 4 +
.../flink/table/calcite/FlinkTypeFactory.scala | 1 +
.../apache/flink/table/codegen/CodeGenUtils.scala | 9 +
.../table/codegen/InputFormatCodeGenerator.scala | 93 ++++++++++
.../SinkCodeGenerator.scala} | 38 ++--
.../flink/table/codegen/ValuesCodeGenerator.scala | 63 +++++++
.../plan/nodes/physical/batch/BatchExecSink.scala | 74 +++++++-
.../nodes/physical/batch/BatchExecValues.scala | 31 +++-
.../nodes/physical/stream/StreamExecSink.scala | 119 +++++++++++-
.../nodes/physical/stream/StreamExecValues.scala | 40 ++++-
.../flink/table/plan/optimize/Optimizer.scala | 2 +-
.../table/plan/rules/FlinkBatchRuleSets.scala | 3 +-
.../table/plan/rules/FlinkStreamRuleSets.scala | 3 +-
.../rules/physical/batch/BatchExecSinkRule.scala | 54 ++++++
.../rules/physical/stream/StreamExecSinkRule.scala | 54 ++++++
...TableSink.scala => AppendStreamTableSink.scala} | 16 +-
...tStreamTableSink.scala => BatchTableSink.scala} | 16 +-
.../flink/table/sinks/CollectTableSink.scala | 83 +++++++++
.../flink/table/sinks/DataStreamTableSink.scala | 65 +++++++
...ableSink.scala => RetractStreamTableSink.scala} | 17 +-
.../flink/table/sinks/UpsertStreamTableSink.scala | 70 ++++++++
.../org/apache/flink/table/util/BaseRowUtil.java | 89 +++++++++
.../table/runtime/batch/sql/ValuesITCase.scala} | 27 +--
.../table/runtime/stream/sql/ValuesITCase.scala | 51 ++++++
.../flink/table/runtime/utils/BatchTestBase.scala | 85 +++++++++
.../flink/table/runtime/utils/StreamTestSink.scala | 134 ++++++++++++++
.../table/runtime/utils/StreamingTestBase.scala | 55 ++++++
.../apache/flink/table/api/TableConfigOptions.java | 19 ++
.../apache/flink/table/dataformat/GenericRow.java | 4 +
.../flink/table/generated/GeneratedClass.java | 12 ++
.../table/runtime/values/ValuesInputFormat.java | 75 ++++++++
.../flink/table/typeutils/BaseRowTypeInfo.java | 5 +
39 files changed, 1912 insertions(+), 69 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 95838cf..8991aef 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -17,9 +17,13 @@
*/
package org.apache.flink.table.api
+import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.plan.nodes.calcite.LogicalSink
+import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
import org.apache.flink.table.plan.optimize.{BatchOptimizer, Optimizer}
import org.apache.flink.table.plan.schema.{BatchTableSourceTable,
TableSourceSinkTable, TableSourceTable}
@@ -32,6 +36,8 @@ import org.apache.calcite.plan.ConventionTraitDef
import org.apache.calcite.rel.RelCollationTraitDef
import org.apache.calcite.sql.SqlExplainLevel
+import _root_.scala.collection.JavaConversions._
+
/**
* A session to construct between [[Table]] and [[DataStream]], its main
function is:
*
@@ -80,6 +86,66 @@ class BatchTableEnvironment(
}
/**
+ * Merge global job parameters and table config parameters,
+ * and set the merged result to GlobalJobParameters
+ */
+ private def mergeParameters(): Unit = {
+ if (streamEnv != null && streamEnv.getConfig != null) {
+ val parameters = new Configuration()
+ if (config != null && config.getConf != null) {
+ parameters.addAll(config.getConf)
+ }
+
+ if (streamEnv.getConfig.getGlobalJobParameters != null) {
+ streamEnv.getConfig.getGlobalJobParameters.toMap.foreach {
+ kv => parameters.setString(kv._1, kv._2)
+ }
+ }
+
+ streamEnv.getConfig.setGlobalJobParameters(parameters)
+ }
+ }
+
+ /**
+ * Writes a [[Table]] to a [[TableSink]].
+ *
+ * Internally, the [[Table]] is translated into a [[DataStream]] and handed
over to the
+ * [[TableSink]] to write it.
+ *
+ * @param table The [[Table]] to write.
+ * @param sink The [[TableSink]] to write the [[Table]] to.
+ * @tparam T The expected type of the [[DataStream]] which represents the
[[Table]].
+ */
+ override private[table] def writeToSink[T](
+ table: Table,
+ sink: TableSink[T],
+ sinkName: String): Unit = {
+ mergeParameters()
+
+ val sinkNode =
LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
+ val optimizedPlan = optimize(sinkNode)
+ val optimizedNodes = translateNodeDag(Seq(optimizedPlan))
+ require(optimizedNodes.size() == 1)
+ translateToPlan(optimizedNodes.head)
+ }
+
+ /**
+ * Translates a [[BatchExecNode]] plan into a [[StreamTransformation]].
+ * Converts to target type if necessary.
+ *
+ * @param node The plan to translate.
+ * @return The [[StreamTransformation]] that corresponds to the given node.
+ */
+ private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] =
{
+ node match {
+ case node: BatchExecNode[_] => node.translateToPlan(this)
+ case _ =>
+ throw new TableException("Cannot generate BoundedStream due to an
invalid logical plan. " +
+ "This is a bug and should not happen.
Please file an issue.")
+ }
+ }
+
+ /**
* Returns the AST of the specified Table API and SQL queries and the
execution plan to compute
* the result of the given [[Table]].
*
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f285214..abb82ce 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -18,15 +18,24 @@
package org.apache.flink.table.api
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.runtime.state.memory.MemoryStateBackend
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.transformations.StreamTransformation
import org.apache.flink.table.calcite.FlinkRelBuilder
-import org.apache.flink.table.plan.`trait`.{AccModeTraitDef,
FlinkRelDistributionTraitDef, MiniBatchIntervalTraitDef,
UpdateAsRetractionTraitDef}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.calcite.LogicalSink
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
import org.apache.flink.table.plan.optimize.{Optimizer, StreamOptimizer}
import org.apache.flink.table.plan.schema._
import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.plan.`trait`.{AccModeTraitDef,
FlinkRelDistributionTraitDef, MiniBatchIntervalTraitDef,
UpdateAsRetractionTraitDef}
import org.apache.flink.table.plan.util.FlinkRelOptUtil
+import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
@@ -34,6 +43,8 @@ import org.apache.calcite.plan.ConventionTraitDef
import org.apache.calcite.rel.RelCollationTraitDef
import org.apache.calcite.sql.SqlExplainLevel
+import _root_.scala.collection.JavaConversions._
+
/**
* The base class for stream TableEnvironments.
*
@@ -60,6 +71,8 @@ abstract class StreamTableEnvironment(
// the naming pattern for internally registered tables.
private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
+ private var isConfigMerged: Boolean = false
+
override def queryConfig: StreamQueryConfig = new StreamQueryConfig
override protected def getOptimizer: Optimizer = new StreamOptimizer(this)
@@ -92,6 +105,102 @@ abstract class StreamTableEnvironment(
)
/**
+ * Merge global job parameters and table config parameters,
+ * and set the merged result to GlobalJobParameters
+ */
+ private def mergeParameters(): Unit = {
+ if (!isConfigMerged && execEnv != null && execEnv.getConfig != null) {
+ val parameters = new Configuration()
+ if (config != null && config.getConf != null) {
+ parameters.addAll(config.getConf)
+ }
+
+ if (execEnv.getConfig.getGlobalJobParameters != null) {
+ execEnv.getConfig.getGlobalJobParameters.toMap.foreach {
+ kv => parameters.setString(kv._1, kv._2)
+ }
+ }
+ val isHeapState = Option(execEnv.getStateBackend) match {
+ case Some(backend) if backend.isInstanceOf[MemoryStateBackend] ||
+ backend.isInstanceOf[FsStateBackend]=> true
+ case None => true
+ case _ => false
+ }
+ parameters.setBoolean(TableConfigOptions.SQL_EXEC_STATE_BACKEND_ON_HEAP,
isHeapState)
+ execEnv.getConfig.setGlobalJobParameters(parameters)
+ isConfigMerged = true
+ }
+ }
+
+ /**
+ * Writes a [[Table]] to a [[TableSink]].
+ *
+ * Internally, the [[Table]] is translated into a [[DataStream]] and handed
over to the
+ * [[TableSink]] to write it.
+ *
+ * @param table The [[Table]] to write.
+ * @param sink The [[TableSink]] to write the [[Table]] to.
+ * @tparam T The expected type of the [[DataStream]] which represents the
[[Table]].
+ */
+ override private[table] def writeToSink[T](
+ table: Table,
+ sink: TableSink[T],
+ sinkName: String): Unit = {
+ val sinkNode =
LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
+ translateSink(sinkNode)
+ }
+
+ /**
+ * Translates a [[Table]] into a [[DataStream]].
+ *
+ * The transformation involves optimizing the relational expression tree as
defined by
+ * Table API calls and / or SQL queries and generating corresponding
[[DataStream]] operators.
+ *
+ * @param table The root node of the relational expression
tree.
+ * @param updatesAsRetraction Set to true to encode updates as retraction
messages.
+ * @param withChangeFlag Set to true to emit records with change flags.
+ * @param resultType The
[[org.apache.flink.api.common.typeinfo.TypeInformation[_]] of
+ * the resulting [[DataStream]].
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The [[DataStream]] that corresponds to the translated [[Table]].
+ */
+ protected def translateToDataStream[T](
+ table: Table,
+ updatesAsRetraction: Boolean,
+ withChangeFlag: Boolean,
+ resultType: TypeInformation[T]): DataStream[T] = {
+ val sink = new DataStreamTableSink[T](table, resultType,
updatesAsRetraction, withChangeFlag)
+ val sinkName = createUniqueTableName()
+ val sinkNode =
LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
+ val transformation = translateSink(sinkNode)
+ new DataStream(execEnv, transformation).asInstanceOf[DataStream[T]]
+ }
+
+ private def translateSink(sink: LogicalSink): StreamTransformation[_] = {
+ mergeParameters()
+
+ val optimizedPlan = optimize(sink)
+ val optimizedNodes = translateNodeDag(Seq(optimizedPlan))
+ require(optimizedNodes.size() == 1)
+ translateToPlan(optimizedNodes.head)
+ }
+
+ /**
+ * Translates a [[StreamExecNode]] plan into a [[StreamTransformation]].
+ *
+ * @param node The plan to translate.
+ * @return The [[StreamTransformation]] of type [[BaseRow]].
+ */
+ private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] =
{
+ node match {
+ case node: StreamExecNode[_] => node.translateToPlan(this)
+ case _ =>
+ throw new TableException("Cannot generate DataStream due to an invalid
logical plan. " +
+ "This is a bug and should not happen.
Please file an issue.")
+ }
+ }
+
+ /**
* Returns the AST of the specified Table API and SQL queries and the
execution plan to compute
* the result of the given [[Table]].
*
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 4c2bdc4..82fe8e4 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.api
+import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
@@ -25,9 +26,12 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkPlannerImpl,
FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
import org.apache.flink.table.plan.cost.FlinkCostFactory
+import org.apache.flink.table.plan.nodes.exec.ExecNode
+import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
import org.apache.flink.table.plan.optimize.Optimizer
import org.apache.flink.table.plan.schema.RelTable
import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.sources.TableSource
import org.apache.flink.types.Row
@@ -152,6 +156,18 @@ abstract class TableEnvironment(val config: TableConfig) {
protected def getOptimizer: Optimizer
/**
+ * Writes a [[Table]] to a [[TableSink]].
+ *
+ * @param table The [[Table]] to write.
+ * @param sink The [[TableSink]] to write the [[Table]] to.
+ * @tparam T The data type that the [[TableSink]] expects.
+ */
+ private[table] def writeToSink[T](
+ table: Table,
+ sink: TableSink[T],
+ sinkName: String = null): Unit
+
+ /**
* Generates the optimized [[RelNode]] dag from the original relational
nodes.
*
* @param roots The root nodes of the relational expression tree.
@@ -172,6 +188,17 @@ abstract class TableEnvironment(val config: TableConfig) {
private[flink] def optimize(root: RelNode): RelNode =
optimize(Seq(root)).head
/**
+ * Convert [[org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel]]
DAG
+ * to [[ExecNode]] DAG and translate them.
+ */
+ @VisibleForTesting
+ private[flink] def translateNodeDag(rels: Seq[RelNode]): Seq[ExecNode[_, _]]
= {
+ require(rels.nonEmpty && rels.forall(_.isInstanceOf[FlinkPhysicalRel]))
+ // convert FlinkPhysicalRel DAG to ExecNode DAG
+ rels.map(_.asInstanceOf[ExecNode[_, _]])
+ }
+
+ /**
* Registers a [[Table]] under a unique name in the TableEnvironment's
catalog.
* Registered tables can be referenced in SQL queries.
*
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
index a025f45..4d58908 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
@@ -18,9 +18,14 @@
package org.apache.flink.table.api
-import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.calcite.FlinkTypeFactory._
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions.TemporalTableFunction
+import
org.apache.flink.table.`type`.TypeConverters.createInternalTypeInfoFromInternalType
+
+import org.apache.calcite.rel.RelNode
+
+import _root_.scala.collection.JavaConversions._
/**
* The implementation of the [[Table]].
@@ -35,12 +40,22 @@ import
org.apache.flink.table.functions.TemporalTableFunction
*/
class TableImpl(val tableEnv: TableEnvironment, relNode: RelNode) extends
Table {
+ private lazy val tableSchema: TableSchema = {
+ val rowType = relNode.getRowType
+ val fieldNames = rowType.getFieldList.map(_.getName)
+ val fieldTypes = rowType.getFieldList map { tp =>
+ val internalType = toInternalType(tp.getType)
+ createInternalTypeInfoFromInternalType(internalType)
+ }
+ new TableSchema(fieldNames.toArray, fieldTypes.toArray)
+ }
+
/**
* Returns the Calcite RelNode represent this Table.
*/
def getRelNode: RelNode = relNode
- override def getSchema: TableSchema = ???
+ override def getSchema: TableSchema = tableSchema
override def printSchema(): Unit = ???
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Types.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Types.scala
new file mode 100644
index 0000000..2e7d81d
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Types.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo,
TypeInformation, Types => JTypes}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo,
ObjectArrayTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.types.Row
+
+import _root_.java.{lang, math, sql, util}
+
+import _root_.scala.annotation.varargs
+
+/**
+ * This class enumerates all supported types of the Table API & SQL.
+ */
+object Types {
+
+ /**
+ * Returns type information for a Table API string or SQL VARCHAR type.
+ */
+ val STRING: TypeInformation[String] = JTypes.STRING
+
+ /**
+ * Returns type information for a Table API boolean or SQL BOOLEAN type.
+ */
+ val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+ /**
+ * Returns type information for a Table API byte or SQL TINYINT type.
+ */
+ val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+ /**
+ * Returns type information for a Table API short or SQL SMALLINT type.
+ */
+ val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+ /**
+ * Returns type information for a Table API integer or SQL INT/INTEGER type.
+ */
+ val INT: TypeInformation[lang.Integer] = JTypes.INT
+
+ /**
+ * Returns type information for a Table API long or SQL BIGINT type.
+ */
+ val LONG: TypeInformation[lang.Long] = JTypes.LONG
+
+ /**
+ * Returns type information for a Table API float or SQL FLOAT/REAL type.
+ */
+ val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+ /**
+ * Returns type information for a Table API integer or SQL DOUBLE type.
+ */
+ val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
+
+ /**
+ * Returns type information for a Table API big decimal or SQL DECIMAL type.
+ */
+ val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+ /**
+ * Returns type information for a Table API SQL date or SQL DATE type.
+ */
+ val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
+
+ /**
+ * Returns type information for a Table API SQL time or SQL TIME type.
+ */
+ val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME
+
+ /**
+ * Returns type information for a Table API SQL timestamp or SQL TIMESTAMP
type.
+ */
+ val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+ /**
+ * Returns type information for a Table API interval of months.
+ */
+ val INTERVAL_MONTHS: TypeInformation[lang.Integer] =
TimeIntervalTypeInfo.INTERVAL_MONTHS
+
+ /**
+ * Returns type information for a Table API interval milliseconds.
+ */
+ val INTERVAL_MILLIS: TypeInformation[lang.Long] =
TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+ /**
+ * Returns type information for [[org.apache.flink.types.Row]] with fields
of the given types.
+ *
+ * A row is a variable-length, null-aware composite type for storing
multiple values in a
+ * deterministic field order. Every field can be null regardless of the
field's type.
+ * The type of row fields cannot be automatically inferred; therefore, it
is required to provide
+ * type information whenever a row is used.
+ *
+ * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code>
fields, however, all
+ * row instances must strictly adhere to the schema defined by the type
info.
+ *
+ * This method generates type information with fields of the given types;
the fields have
+ * the default names (f0, f1, f2 ..).
+ *
+ * @param types The types of the row fields, e.g., Types.STRING, Types.INT
+ */
+ @varargs
+ def ROW(types: TypeInformation[_]*): TypeInformation[Row] = {
+ JTypes.ROW(types: _*)
+ }
+
+ /**
+ * Returns type information for [[org.apache.flink.types.Row]] with fields
of the given types
+ * and with given names.
+ *
+ * A row is a variable-length, null-aware composite type for storing
multiple values in a
+ * deterministic field order. Every field can be null independent of the
field's type.
+ * The type of row fields cannot be automatically inferred; therefore, it
is required to provide
+ * type information whenever a row is used.
+ *
+ * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code>
fields, however, all
+ * row instances must strictly adhere to the schema defined by the type
info.
+ *
+ * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING,
Types.INT))`.
+ *
+ * @param fieldNames array of field names
+ * @param types array of field types
+ */
+ def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]):
TypeInformation[Row] = {
+ JTypes.ROW_NAMED(fieldNames, types: _*)
+ }
+
+ /**
+ * Generates type information for an array consisting of Java primitive
elements. The elements
+ * do not support null values.
+ *
+ * @param elementType type of the array elements; e.g. Types.INT
+ */
+ def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
+ elementType match {
+ case BOOLEAN => PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+ case BYTE => PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+ case SHORT => PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+ case INT => PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+ case LONG => PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+ case FLOAT => PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+ case DOUBLE => PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+ case _ =>
+ throw new TableException(s"$elementType cannot be an element of a
primitive array." +
+ s"Only Java primitive types are supported.")
+ }
+ }
+
+ /**
+ * Generates type information for an array consisting of Java object
elements. Null values for
+ * elements are supported.
+ *
+ * @param elementType type of the array elements; e.g. Types.STRING or
Types.INT
+ */
+ def OBJECT_ARRAY[E](elementType: TypeInformation[E]):
TypeInformation[Array[E]] = {
+ ObjectArrayTypeInfo.getInfoFor(elementType)
+ }
+
+ /**
+ * Generates type information for a Java HashMap. Null values in keys are
not supported. An
+ * entry's value can be null.
+ *
+ * @param keyType type of the keys of the map e.g. Types.STRING
+ * @param valueType type of the values of the map e.g. Types.STRING
+ */
+ def MAP[K, V](
+ keyType: TypeInformation[K],
+ valueType: TypeInformation[V]): TypeInformation[util.Map[K, V]] = {
+ new MapTypeInfo(keyType, valueType)
+ }
+
+ /**
+ * Generates type information for a Multiset. A Multiset is baked by a Java
HashMap and maps an
+ * arbitrary key to an integer value. Null values in keys are not supported.
+ *
+ * @param elementType type of the elements of the multiset e.g. Types.STRING
+ */
+ def MULTISET[E](elementType: TypeInformation[E]):
TypeInformation[util.Map[E, lang.Integer]] = {
+ new MultisetTypeInfo(elementType)
+ }
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 04df0cf..136ea4d 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -17,6 +17,10 @@
*/
package org.apache.flink.table.api.scala
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.asScalaStream
+import org.apache.flink.streaming.api.scala.{createTypeInformation => _, _}
import org.apache.flink.streaming.api.scala.{DataStream,
StreamExecutionEnvironment}
import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
@@ -127,6 +131,49 @@ class StreamTableEnvironment @deprecated(
registerDataStreamInternal(name, dataStream.javaStream, exprs)
}
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a
specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]]
is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as
follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are
mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field
types must match.
+ *
+ * @param table The [[Table]] to convert.
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
+ val returnType = createTypeInformation[T]
+ asScalaStream(translateToDataStream[T](
+ table,
+ updatesAsRetraction = false,
+ withChangeFlag = false,
+ returnType))
+ }
+
+ /**
+ * Converts the given [[Table]] into a [[DataStream]] of add and retract
messages.
+ * The message will be encoded as [[Tuple2]]. The first field is a
[[Boolean]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[Boolean]] flag indicates an add message, a false flag indicates
a retract message.
+ *
+ * @param table The [[Table]] to convert.
+ * @tparam T The type of the requested data type.
+ * @return The converted [[DataStream]].
+ */
+ def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean,
T)] = {
+ val returnType = createTypeInformation[(Boolean, T)]
+ asScalaStream(translateToDataStream[(Boolean, T)](
+ table,
+ updatesAsRetraction = true,
+ withChangeFlag = true,
+ returnType))
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
new file mode 100644
index 0000000..d565c85
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.scala.{StreamTableEnvironment =>
ScalaStreamTableEnv}
+import org.apache.flink.table.api.{Table, TableException, TableImpl}
+
+/**
+ * Holds methods to convert a [[Table]] into a [[DataSet]] or a
[[DataStream]].
+ *
+ * @param table The table to convert.
+ */
+class TableConversions(table: Table) {
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a
specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]]
is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as
follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are
mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field
types must match.
+ *
+ * NOTE: This method only supports conversion of append-only tables. In
order to make this
+ * more explicit in the future, please use [[toAppendStream()]] instead.
+ * If add and retract messages are required, use [[toRetractStream()]].
+ *
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ @deprecated("This method only supports conversion of append-only tables. In
order to make this" +
+ " more explicit in the future, please use toAppendStream() instead.")
+ def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream
+
+ /**
+ * Converts the given [[Table]] into an append [[DataStream]] of a
specified type.
+ *
+ * The [[Table]] must only have insert (append) changes. If the [[Table]]
is also modified
+ * by update or delete changes, the conversion will fail.
+ *
+ * The fields of the [[Table]] are mapped to [[DataStream]] fields as
follows:
+ * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are
mapped by position, field
+ * types must match.
+ * - POJO [[DataStream]] types: Fields are mapped by field name, field
types must match.
+ *
+ * @tparam T The type of the resulting [[DataStream]].
+ * @return The converted [[DataStream]].
+ */
+ def toAppendStream[T: TypeInformation]: DataStream[T] = {
+ table.asInstanceOf[TableImpl].tableEnv match {
+ case tEnv: ScalaStreamTableEnv =>
+ tEnv.toAppendStream(table)
+ case _ =>
+ throw new TableException(
+ "Only tables that originate from Scala DataStreams " +
+ "can be converted to Scala DataStreams.")
+ }
+ }
+
+ /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
+ * The message will be encoded as [[Tuple2]]. The first field is a
[[Boolean]] flag,
+ * the second field holds the record of the specified type [[T]].
+ *
+ * A true [[Boolean]] flag indicates an add message, a false flag indicates
a retract message.
+ *
+ */
+ def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = {
+ table.asInstanceOf[TableImpl].tableEnv match {
+ case tEnv: ScalaStreamTableEnv =>
+ tEnv.toRetractStream(table)
+ case _ =>
+ throw new TableException(
+ "Only tables that originate from Scala DataStreams " +
+ "can be converted to Scala DataStreams.")
+ }
+ }
+
+}
+
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
index 20fbe8e..eb2ff27 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -60,6 +60,10 @@ import _root_.scala.language.implicitConversions
*/
package object scala {
+ implicit def table2TableConversions(table: Table): TableConversions = {
+ new TableConversions(table)
+ }
+
implicit def dataStream2DataStreamConversions[T](set: DataStream[T]):
DataStreamConversions[T] = {
new DataStreamConversions[T](set, set.dataType)
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 43826c0..81fea7d 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.calcite
import org.apache.flink.table.`type`._
import org.apache.flink.table.api.{TableException, TableSchema}
import org.apache.flink.table.plan.schema.{GenericRelDataType, _}
+
import org.apache.calcite.avatica.util.TimeUnit
import org.apache.calcite.jdbc.JavaTypeFactoryImpl
import org.apache.calcite.rel.`type`._
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index b27280e..4e7003d 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.`type`._
import org.apache.flink.table.dataformat.DataFormatConverters.IdentityConverter
import org.apache.flink.table.dataformat.{Decimal, _}
import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.types.Row
import java.lang.reflect.Method
import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar,
Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short =>
JShort}
@@ -163,6 +164,14 @@ object CodeGenUtils {
case _ => "null"
}
+ /**
+ * If it's internally compatible, don't need to DataStructure converter.
+ * clazz != classOf[Row] => Row can only infer GenericType[Row].
+ */
+ def isInternalClass(clazz: Class[_], t: TypeInformation[_]): Boolean =
+ clazz != classOf[Object] && clazz != classOf[Row] &&
+ (classOf[BaseRow].isAssignableFrom(clazz) || clazz == t.getTypeClass)
+
// -------------------------- Method & Enum
---------------------------------------
def qualifyMethod(method: Method): String =
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
new file mode 100644
index 0000000..7a41f13
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.table.codegen.CodeGenUtils.newName
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.dataformat.GenericRow
+import org.apache.flink.table.generated.GeneratedInput
+import org.apache.flink.table.`type`.InternalType
+
+/**
+ * A code generator for generating Flink [[GenericInputFormat]]s.
+ */
+object InputFormatCodeGenerator {
+
+ /**
+ * Generates a values input format that can be passed to Java compiler.
+ *
+ * @param ctx The code generator context
+ * @param name Class name of the input format. Must not be unique but has
to be a
+ * valid Java class identifier.
+ * @param records code for creating records
+ * @param returnType expected return type
+ * @param outRecordTerm term of the output
+ * @tparam T Return type of the Flink Function.
+ * @return instance of GeneratedFunction
+ */
+ def generateValuesInputFormat[T](
+ ctx: CodeGeneratorContext,
+ name: String,
+ records: Seq[String],
+ returnType: InternalType,
+ outRecordTerm: String = CodeGenUtils.DEFAULT_OUT_RECORD_TERM,
+ outRecordWriterTerm: String =
CodeGenUtils.DEFAULT_OUT_RECORD_WRITER_TERM)
+ : GeneratedInput[GenericInputFormat[T]] = {
+ val funcName = newName(name)
+
+ ctx.addReusableOutputRecord(returnType, classOf[GenericRow], outRecordTerm,
+ Some(outRecordWriterTerm))
+
+ val funcCode = j"""
+ public class $funcName extends
${classOf[GenericInputFormat[_]].getCanonicalName} {
+
+ private int nextIdx = 0;
+
+ ${ctx.reuseMemberCode()}
+
+ public $funcName(Object[] references) throws Exception {
+ ${ctx.reuseInitCode()}
+ }
+
+ @Override
+ public boolean reachedEnd() throws java.io.IOException {
+ return nextIdx >= ${records.length};
+ }
+
+ @Override
+ public Object nextRecord(Object reuse) {
+ switch (nextIdx) {
+ ${records.zipWithIndex.map { case (r, i) =>
+ s"""
+ |case $i:
+ | $r
+ |break;
+ """.stripMargin
+ }.mkString("\n")}
+ }
+ nextIdx++;
+ return $outRecordTerm;
+ }
+ }
+ """.stripMargin
+
+ new GeneratedInput(funcName, funcCode, ctx.references.toArray)
+ }
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SinkCodeGenerator.scala
similarity index 51%
copy from
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
copy to
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SinkCodeGenerator.scala
index 059a971..f51803f 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SinkCodeGenerator.scala
@@ -16,30 +16,26 @@
* limitations under the License.
*/
-package org.apache.flink.table.plan.nodes.physical.batch
+package org.apache.flink.table.codegen
-import org.apache.flink.table.plan.nodes.calcite.Sink
-import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink}
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
+object SinkCodeGenerator {
-import java.util
-
-/**
- * Batch physical RelNode to to write data into an external sink defined by a
[[TableSink]].
- */
-class BatchExecSink[T](
- cluster: RelOptCluster,
- traitSet: RelTraitSet,
- inputRel: RelNode,
- sink: TableSink[T],
- sinkName: String)
- extends Sink(cluster, traitSet, inputRel, sink, sinkName)
- with BatchPhysicalRel {
-
- override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
- new BatchExecSink(cluster, traitSet, inputs.get(0), sink, sinkName)
+ private[flink] def extractTableSinkTypeClass(sink: TableSink[_]): Class[_] =
{
+ try {
+ sink match {
+ // DataStreamTableSink has no generic class, so we need get the type
to get type class.
+ case sink: DataStreamTableSink[_] => sink.getOutputType.getTypeClass
+ case _ => TypeExtractor.createTypeInfo(sink, classOf[TableSink[_]],
sink.getClass, 0)
+ .getTypeClass.asInstanceOf[Class[_]]
+ }
+ } catch {
+ case _: InvalidTypesException =>
+ classOf[Object]
+ }
}
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala
new file mode 100644
index 0000000..c207bd6
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
+import org.apache.flink.table.runtime.values.ValuesInputFormat
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.RexLiteral
+
+import com.google.common.collect.ImmutableList
+
+import scala.collection.JavaConversions._
+
+object ValuesCodeGenerator {
+
+ def generatorInputFormat(
+ tableEnv: TableEnvironment,
+ rowType: RelDataType,
+ tuples: ImmutableList[ImmutableList[RexLiteral]],
+ description: String): ValuesInputFormat = {
+ val config = tableEnv.getConfig
+ val outputType = FlinkTypeFactory.toInternalRowType(rowType)
+
+ val ctx = CodeGeneratorContext(config)
+ val exprGenerator = new ExprCodeGenerator(ctx, false)
+ // generate code for every record
+ val generatedRecords = tuples.map { r =>
+ exprGenerator.generateResultExpression(
+ r.map(exprGenerator.generateExpression), outputType,
classOf[GenericRow])
+ }
+
+ // generate input format
+ val generatedFunction =
InputFormatCodeGenerator.generateValuesInputFormat[BaseRow](
+ ctx,
+ description,
+ generatedRecords.map(_.code),
+ outputType)
+
+ val baseRowTypeInfo = new BaseRowTypeInfo(outputType.getFieldTypes,
outputType.getFieldNames)
+ new ValuesInputFormat(generatedFunction, baseRowTypeInfo)
+ }
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
index 059a971..35eab30 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
@@ -18,14 +18,24 @@
package org.apache.flink.table.plan.nodes.physical.batch
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment,
TableException}
+import org.apache.flink.table.codegen.CodeGenUtils
+import org.apache.flink.table.codegen.SinkCodeGenerator._
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
import org.apache.flink.table.plan.nodes.calcite.Sink
-import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sinks.{BatchTableSink, DataStreamTableSink,
TableSink}
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.RelNode
import java.util
+import scala.collection.JavaConversions._
+
/**
* Batch physical RelNode to to write data into an external sink defined by a
[[TableSink]].
*/
@@ -36,10 +46,70 @@ class BatchExecSink[T](
sink: TableSink[T],
sinkName: String)
extends Sink(cluster, traitSet, inputRel, sink, sinkName)
- with BatchPhysicalRel {
+ with BatchPhysicalRel
+ with BatchExecNode[Any] {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
new BatchExecSink(cluster, traitSet, inputs.get(0), sink, sinkName)
}
+ //~ ExecNode methods
-----------------------------------------------------------
+
+ /**
+ * For sink operator, the records will not pass through it, so it's
DamBehavior is FULL_DAM.
+ *
+ * @return Returns [[DamBehavior]] of Sink.
+ */
+ override def getDamBehavior: DamBehavior = DamBehavior.FULL_DAM
+
+ override def getInputNodes: util.List[ExecNode[BatchTableEnvironment, _]] = {
+ List(getInput.asInstanceOf[ExecNode[BatchTableEnvironment, _]])
+ }
+
+ override protected def translateToPlanInternal(
+ tableEnv: BatchTableEnvironment): StreamTransformation[Any] = {
+ val resultTransformation = sink match {
+ case batchTableSink: BatchTableSink[T] =>
+ val transformation = translateToStreamTransformation(withChangeFlag =
false, tableEnv)
+ val boundedStream = new DataStream(tableEnv.streamEnv, transformation)
+ batchTableSink.emitBoundedStream(
+ boundedStream, tableEnv.getConfig,
tableEnv.streamEnv.getConfig).getTransformation
+
+ case streamTableSink: DataStreamTableSink[T] =>
+ // In case of table to bounded stream through
BatchTableEnvironment#toBoundedStream, we
+ // insert a DataStreamTableSink then wrap it as a LogicalSink, there
is no real batch table
+ // sink, so we do not need to invoke TableSink#emitBoundedStream and
set resource, just a
+ // translation to StreamTransformation is ok.
+ translateToStreamTransformation(withChangeFlag =
streamTableSink.withChangeFlag, tableEnv)
+
+ case _ =>
+ throw new TableException("Only Support BatchTableSink or
DataStreamTableSink!")
+ }
+ resultTransformation.asInstanceOf[StreamTransformation[Any]]
+ }
+
+ private def translateToStreamTransformation(
+ withChangeFlag: Boolean,
+ tableEnv: BatchTableEnvironment): StreamTransformation[T] = {
+ val resultType = sink.getOutputType
+ TableEnvironment.validateType(resultType)
+ val inputNode = getInputNodes.get(0)
+ inputNode match {
+ // Sink's input must be BatchExecNode[BaseRow] now.
+ case node: BatchExecNode[BaseRow] =>
+ val plan = node.translateToPlan(tableEnv)
+ // TODO support SinkConversion after FLINK-11974 is done
+ val typeClass = extractTableSinkTypeClass(sink)
+ if (CodeGenUtils.isInternalClass(typeClass, resultType)) {
+ plan.asInstanceOf[StreamTransformation[T]]
+ } else {
+ throw new TableException(s"Not support SinkConvention now.")
+ }
+ case _ =>
+ throw new TableException("Cannot generate BoundedStream due to an
invalid logical plan. " +
+ "This is a bug and should not happen.
Please file an issue.")
+ }
+ }
+
+
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
index 00f47827..ad312f2 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
@@ -18,13 +18,21 @@
package org.apache.flink.table.plan.nodes.physical.batch
-import com.google.common.collect.ImmutableList
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.codegen.ValuesCodeGenerator
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.core.Values
import org.apache.calcite.rel.{RelNode, RelWriter}
import org.apache.calcite.rex.RexLiteral
+import com.google.common.collect.ImmutableList
+
import java.util
import scala.collection.JavaConversions._
@@ -38,7 +46,8 @@ class BatchExecValues(
tuples: ImmutableList[ImmutableList[RexLiteral]],
outputRowType: RelDataType)
extends Values(cluster, outputRowType, tuples, traitSet)
- with BatchPhysicalRel {
+ with BatchPhysicalRel
+ with BatchExecNode[BaseRow] {
override def deriveRowType(): RelDataType = outputRowType
@@ -51,5 +60,23 @@ class BatchExecValues(
.item("values", getRowType.getFieldNames.toList.mkString(", "))
}
+ //~ ExecNode methods
-----------------------------------------------------------
+
+ override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
+
+ override protected def translateToPlanInternal(
+ tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
+ val inputFormat = ValuesCodeGenerator.generatorInputFormat(
+ tableEnv,
+ getRowType,
+ tuples,
+ getRelTypeName)
+ tableEnv.streamEnv.createInput(inputFormat,
inputFormat.getProducedType).getTransformation
+ }
+
+ override def getInputNodes: util.List[ExecNode[BatchTableEnvironment, _]] = {
+ new util.ArrayList[ExecNode[BatchTableEnvironment, _]]()
+ }
+
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
index 7b74b67..c9adccc 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
@@ -18,7 +18,17 @@
package org.apache.flink.table.plan.nodes.physical.stream
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{StreamTableEnvironment, Table,
TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenUtils
+import
org.apache.flink.table.codegen.SinkCodeGenerator.extractTableSinkTypeClass
+import org.apache.flink.table.dataformat.BaseRow
import org.apache.flink.table.plan.nodes.calcite.Sink
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef}
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.sinks._
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -26,6 +36,8 @@ import org.apache.calcite.rel.RelNode
import java.util
+import scala.collection.JavaConversions._
+
/**
* Stream physical RelNode to to write data into an external sink defined by
a [[TableSink]].
*/
@@ -36,12 +48,13 @@ class StreamExecSink[T](
sink: TableSink[T],
sinkName: String)
extends Sink(cluster, traitSet, inputRel, sink, sinkName)
- with StreamPhysicalRel {
+ with StreamPhysicalRel
+ with StreamExecNode[Any] {
override def producesUpdates: Boolean = false
override def needsUpdatesAsRetraction(input: RelNode): Boolean =
- sink.isInstanceOf[BaseRetractStreamTableSink[_]]
+ sink.isInstanceOf[RetractStreamTableSink[_]]
override def consumesRetractions: Boolean = false
@@ -53,5 +66,107 @@ class StreamExecSink[T](
new StreamExecSink(cluster, traitSet, inputs.get(0), sink, sinkName)
}
+ //~ ExecNode methods
-----------------------------------------------------------
+
+ override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] =
{
+ List(getInput.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+ }
+
+ override protected def translateToPlanInternal(
+ tableEnv: StreamTableEnvironment): StreamTransformation[Any] = {
+ val resultTransformation = sink match {
+ case streamTableSink: StreamTableSink[T] =>
+ val transformation = streamTableSink match {
+ case _: RetractStreamTableSink[T] =>
+ translateToStreamTransformation(withChangeFlag = true, tableEnv)
+
+ case upsertSink: UpsertStreamTableSink[T] =>
+ // check for append only table
+ val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(this)
+ upsertSink.setIsAppendOnly(isAppendOnlyTable)
+ translateToStreamTransformation(withChangeFlag = true, tableEnv)
+
+ case _: AppendStreamTableSink[T] =>
+ // verify table is an insert-only (append-only) table
+ if (!UpdatingPlanChecker.isAppendOnly(this)) {
+ throw new TableException(
+ "AppendStreamTableSink requires that Table has only insert
changes.")
+ }
+
+ val accMode =
this.getTraitSet.getTrait(AccModeTraitDef.INSTANCE).getAccMode
+ if (accMode == AccMode.AccRetract) {
+ throw new TableException(
+ "AppendStreamTableSink can not be used to output retraction
messages.")
+ }
+ translateToStreamTransformation(withChangeFlag = false, tableEnv)
+
+ case _ =>
+ throw new TableException(
+ "Stream Tables can only be emitted by AppendStreamTableSink, " +
+ "RetractStreamTableSink, or UpsertStreamTableSink.")
+ }
+ val dataStream = new DataStream(tableEnv.execEnv, transformation)
+ streamTableSink.emitDataStream(dataStream).getTransformation
+
+ case streamTableSink: DataStreamTableSink[_] =>
+ // In case of table to stream through
BatchTableEnvironment#translateToDataStream,
+ // we insert a DataStreamTableSink then wrap it as a LogicalSink,
there is no real batch
+ // table sink, so we do not need to invoke TableSink#emitBoundedStream
and set resource,
+ // just a translation to StreamTransformation is ok.
+ translateToStreamTransformation(streamTableSink.withChangeFlag,
tableEnv)
+
+ case _ =>
+ throw new TableException("Only Support StreamTableSink or
DataStreamTableSink!")
+ }
+ resultTransformation.asInstanceOf[StreamTransformation[Any]]
+ }
+
+ /**
+ * Translates a logical [[RelNode]] into a [[StreamTransformation]].
+ *
+ * @param withChangeFlag Set to true to emit records with change flags.
+ * @return The [[StreamTransformation]] that corresponds to the translated
[[Table]].
+ */
+ private def translateToStreamTransformation(
+ withChangeFlag: Boolean,
+ tableEnv: StreamTableEnvironment): StreamTransformation[T] = {
+ val inputNode = getInput
+ // if no change flags are requested, verify table is an insert-only
(append-only) table.
+ if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(inputNode)) {
+ throw new TableException(
+ "Table is not an append-only table. " +
+ "Use the toRetractStream() in order to handle add and retract
messages.")
+ }
+
+ // get BaseRow plan
+ val parTransformation = inputNode match {
+ // Sink's input must be StreamExecNode[BaseRow] now.
+ case node: StreamExecNode[BaseRow] =>
+ node.translateToPlan(tableEnv)
+ case _ =>
+ throw new TableException("Cannot generate DataStream due to an invalid
logical plan. " +
+ "This is a bug and should not happen.
Please file an issue.")
+ }
+ val logicalType = inputNode.getRowType
+ val rowtimeFields = logicalType.getFieldList
+ .filter(f =>
FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+ if (rowtimeFields.size > 1) {
+ throw new TableException(
+ s"Found more than one rowtime field:
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
+ s"the table that should be converted to a DataStream.\n" +
+ s"Please select the rowtime field that should be used as event-time
timestamp for the " +
+ s"DataStream by casting all other fields to TIMESTAMP.")
+ }
+ val resultType = sink.getOutputType
+ // TODO support SinkConversion after FLINK-11974 is done
+ val typeClass = extractTableSinkTypeClass(sink)
+ if (CodeGenUtils.isInternalClass(typeClass, resultType)) {
+ parTransformation.asInstanceOf[StreamTransformation[T]]
+ } else {
+ throw new TableException(s"Not support SinkConvention now.")
+ }
+
+ }
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
index b87080a..4b63f18 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
@@ -18,13 +18,22 @@
package org.apache.flink.table.plan.nodes.physical.stream
-import com.google.common.collect.ImmutableList
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfigOptions,
TableException}
+import org.apache.flink.table.codegen.ValuesCodeGenerator
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+
import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.Values
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Values
import org.apache.calcite.rex.RexLiteral
+import com.google.common.collect.ImmutableList
+
+import java.util
+
/**
* Stream physical RelNode for [[Values]].
*/
@@ -34,7 +43,8 @@ class StreamExecValues(
tuples: ImmutableList[ImmutableList[RexLiteral]],
outputRowType: RelDataType)
extends Values(cluster, outputRowType, tuples, traitSet)
- with StreamPhysicalRel {
+ with StreamPhysicalRel
+ with StreamExecNode[BaseRow] {
override def producesUpdates: Boolean = false
@@ -51,4 +61,28 @@ class StreamExecValues(
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]):
RelNode = {
new StreamExecValues(cluster, traitSet, getTuples, outputRowType)
}
+
+ //~ ExecNode methods
-----------------------------------------------------------
+
+ override protected def translateToPlanInternal(
+ tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
+ if (tableEnv.getConfig.getConf.getBoolean(
+ TableConfigOptions.SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED)) {
+ val inputFormat = ValuesCodeGenerator.generatorInputFormat(
+ tableEnv,
+ getRowType,
+ tuples,
+ getRelTypeName)
+ tableEnv.execEnv.createInput(inputFormat,
inputFormat.getProducedType).getTransformation
+ } else {
+ // enable this feature when runtime support do checkpoint when source
finished
+ throw new TableException("Values source input is not supported
currently. Probably " +
+ "there is a where condition which always returns false in your query.")
+ }
+ }
+
+ override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] =
{
+ new util.ArrayList[ExecNode[StreamTableEnvironment, _]]()
+ }
+
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
index e54c9d2..97ff92b 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
@@ -31,7 +31,7 @@ trait Optimizer {
* <p>NOTES:
* <p>1. The reused node in result DAG will be converted to the same
RelNode.
* <p>2. If a root node requires retract changes on Stream, the node should
be
- * a [[org.apache.flink.table.sinks.BaseRetractStreamTableSink]] or
+ * a [[org.apache.flink.table.sinks.RetractStreamTableSink]] or
* a regular node with
[[org.apache.flink.table.plan.trait.UpdateAsRetractionTrait]]
* which `updateAsRetraction` is true.
*
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index e2bd710..ac05926 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -209,6 +209,7 @@ object FlinkBatchRuleSets {
BatchExecScanTableSourceRule.INSTANCE,
BatchExecValuesRule.INSTANCE,
BatchExecCalcRule.INSTANCE,
- BatchExecUnionRule.INSTANCE
+ BatchExecUnionRule.INSTANCE,
+ BatchExecSinkRule.INSTANCE
)
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index a32c926..e8b96ef 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -215,7 +215,8 @@ object FlinkStreamRuleSets {
StreamExecTableSourceScanRule.INSTANCE,
StreamExecValuesRule.INSTANCE,
StreamExecCalcRule.INSTANCE,
- StreamExecUnionRule.INSTANCE
+ StreamExecUnionRule.INSTANCE,
+ StreamExecSinkRule.INSTANCE
)
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
new file mode 100644
index 0000000..93aaeb6
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.batch
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.RelNode
+
+class BatchExecSinkRule extends ConverterRule(
+ classOf[FlinkLogicalSink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.BATCH_PHYSICAL,
+ "BatchExecSinkRule") {
+
+ def convert(rel: RelNode): RelNode = {
+ val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
+ val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+ // TODO Take PartitionableSink into consideration after FLINK-11993 is done
+ val newInput = RelOptRule.convert(sinkNode.getInput,
FlinkConventions.BATCH_PHYSICAL)
+
+ new BatchExecSink(
+ rel.getCluster,
+ newTrait,
+ newInput,
+ sinkNode.sink,
+ sinkNode.sinkName)
+ }
+}
+
+object BatchExecSinkRule {
+
+ val INSTANCE: RelOptRule = new BatchExecSinkRule
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
new file mode 100644
index 0000000..5ae6ead
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+import org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.RelNode
+
+class StreamExecSinkRule extends ConverterRule(
+ classOf[FlinkLogicalSink],
+ FlinkConventions.LOGICAL,
+ FlinkConventions.STREAM_PHYSICAL,
+ "StreamExecSinkRule") {
+
+ def convert(rel: RelNode): RelNode = {
+ val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
+ val newTrait = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+ // TODO Take PartitionableSink into consideration after FLINK-11993 is done
+ val newInput = RelOptRule.convert(sinkNode.getInput,
FlinkConventions.STREAM_PHYSICAL)
+
+ new StreamExecSink(
+ rel.getCluster,
+ newTrait,
+ newInput,
+ sinkNode.sink,
+ sinkNode.sinkName)
+ }
+}
+
+object StreamExecSinkRule {
+
+ val INSTANCE: RelOptRule = new StreamExecSinkRule
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
similarity index 67%
copy from
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
copy to
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
index 82b9590..d0c8807 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
@@ -18,17 +18,15 @@
package org.apache.flink.table.sinks
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.table.api.Table
/**
- * Defines an external [[TableSink]] to emit a streaming [[Table]] with
insert, update, and delete
- * changes.
+ * Defines an external [[TableSink]] to emit streaming [[Table]] with only
insert changes.
*
- * @tparam T Type of records that this [[TableSink]] expects and supports.
+ * If the [[Table]] is also modified by update or delete changes, a
+ * [[org.apache.flink.table.api.TableException]] will be thrown.
+ *
+ * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and
supports.
*/
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
-
- /** Emits the DataStream. */
- def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
-}
+trait AppendStreamTableSink[T] extends StreamTableSink[T]
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
similarity index 69%
copy from
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
copy to
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
index 82b9590..8ab44b4 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
@@ -18,17 +18,19 @@
package org.apache.flink.table.sinks
+import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.table.api.Table
+import org.apache.flink.table.api._
-/**
- * Defines an external [[TableSink]] to emit a streaming [[Table]] with
insert, update, and delete
- * changes.
+/** Defines an external [[TableSink]] to emit a batch [[Table]].
*
- * @tparam T Type of records that this [[TableSink]] expects and supports.
+ * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and
supports.
*/
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
+trait BatchTableSink[T] extends TableSink[T] {
/** Emits the DataStream. */
- def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
+ def emitBoundedStream(
+ boundedStream: DataStream[T],
+ tableConfig: TableConfig,
+ executionConfig: ExecutionConfig): DataStreamSink[_]
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
new file mode 100644
index 0000000..27a7d49
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.table.api._
+import org.apache.flink.types.Row
+
+/**
+ * A simple [[TableSink]] to emit data as T to a collection.
+ */
+class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] =>
TypeInformation[T]))
+ extends TableSinkBase[T] with BatchTableSink[T] {
+
+ private var collectOutputFormat: CollectOutputFormat[T] = _
+
+ override def emitBoundedStream(
+ boundedStream: DataStream[T],
+ tableConfig: TableConfig,
+ executionConfig: ExecutionConfig): DataStreamSink[T] = {
+ boundedStream.writeUsingOutputFormat(collectOutputFormat)
+ .name("collect")
+ }
+
+ override protected def copy: TableSinkBase[T] = {
+ new CollectTableSink(produceOutputType)
+ }
+
+ override def getOutputType: TypeInformation[T] = {
+ produceOutputType(getFieldTypes)
+ }
+
+ def init(typeSerializer: TypeSerializer[T], id: String): Unit = {
+ collectOutputFormat = new CollectOutputFormat(id, typeSerializer)
+ }
+}
+
+class CollectOutputFormat[T](id: String, typeSerializer: TypeSerializer[T])
+ extends RichOutputFormat[T] {
+
+ private var accumulator: SerializedListAccumulator[T] = _
+
+ override def writeRecord(record: T): Unit = {
+ accumulator.add(record, typeSerializer)
+ }
+
+ override def configure(parameters: Configuration): Unit = {
+ }
+
+ override def close(): Unit = {
+ // Important: should only be added in close method to minimize traffic of
accumulators
+ getRuntimeContext.addAccumulator(id, accumulator)
+ }
+
+ override def open(taskNumber: Int, numTasks: Int): Unit = {
+ this.accumulator = new SerializedListAccumulator[T]
+ }
+}
+
+class CollectRowTableSink extends CollectTableSink[Row](new RowTypeInfo(_: _*))
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
new file mode 100644
index 0000000..c4a308d
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{Table, TableException}
+
+/**
+ * A [[DataStreamTableSink]] specifies how to emit a [[Table]] to an
DataStream[T]
+ *
+ * @param table The [[Table]] to emit.
+ * @param outputType The [[TypeInformation]] that specifies the type of the
[[DataStream]].
+ * @param updatesAsRetraction Set to true to encode updates as retraction
messages.
+ * @param withChangeFlag Set to true to emit records with change flags.
+ * @tparam T The type of the resulting [[DataStream]].
+ */
+@Internal
+class DataStreamTableSink[T](
+ table: Table,
+ outputType: TypeInformation[T],
+ val updatesAsRetraction: Boolean,
+ val withChangeFlag: Boolean) extends TableSink[T] {
+
+ private lazy val tableSchema = table.getSchema
+
+ /**
+ * Return the type expected by this [[TableSink]].
+ *
+ * This type should depend on the types returned by [[getFieldNames]].
+ *
+ * @return The type expected by this [[TableSink]].
+ */
+ override def getOutputType: TypeInformation[T] = outputType
+
+ /** Returns the types of the table fields. */
+ override def getFieldTypes: Array[TypeInformation[_]] =
Array(tableSchema.getFieldTypes: _*)
+
+ /** Returns the names of the table fields. */
+ override def getFieldNames: Array[String] = tableSchema.getFieldNames
+
+ override def configure(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
+ throw new TableException(s"configure is not supported.")
+ }
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
similarity index 67%
copy from
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
copy to
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
index 82b9590..2ee0449 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
@@ -18,8 +18,12 @@
package org.apache.flink.table.sinks
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.table.api.Table
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.{Table, Types}
+
+import java.lang.{Boolean => JBool}
/**
* Defines an external [[TableSink]] to emit a streaming [[Table]] with
insert, update, and delete
@@ -27,8 +31,11 @@ import org.apache.flink.table.api.Table
*
* @tparam T Type of records that this [[TableSink]] expects and supports.
*/
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
+trait RetractStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] {
+
+ /** Returns the requested record type */
+ def getRecordType: TypeInformation[T]
+
+ override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
- /** Emits the DataStream. */
- def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
new file mode 100644
index 0000000..330faac
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.{Table, Types}
+
+import java.lang.{Boolean => JBool}
+
+
+/**
+ * Defines an external [[TableSink]] to emit a streaming [[Table]] with
insert, update, and delete
+ * changes. The [[Table]] must be have unique key fields (atomic or
composite) or be append-only.
+ *
+ * If the [[Table]] does not have a unique key and is not append-only, a
+ * [[org.apache.flink.table.api.TableException]] will be thrown.
+ *
+ * The unique key of the table is configured by the
[[UpsertStreamTableSink#setKeyFields()]]
+ * method.
+ *
+ * If the table is append-only, all messages will have a true flag and must
be interpreted
+ * as insertions.
+ *
+ * @tparam T Type of records that this [[TableSink]] expects and supports.
+ */
+trait UpsertStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] {
+
+ /**
+ * Configures the unique key fields of the [[Table]] to write.
+ * The method is called after [[TableSink.configure()]].
+ *
+ * The keys array might be empty, if the table consists of a single
(updated) record.
+ * If the table does not have a key and is append-only, the keys attribute
is null.
+ *
+ * @param keys the field names of the table's keys, an empty array if the
table has a single
+ * row, and null if the table is append-only and has no key.
+ */
+ def setKeyFields(keys: Array[String]): Unit
+
+ /**
+ * Specifies whether the [[Table]] to write is append-only or not.
+ *
+ * @param isAppendOnly true if the table is append-only, false otherwise.
+ */
+ def setIsAppendOnly(isAppendOnly: JBool): Unit
+
+ /** Returns the requested record type */
+ def getRecordType: TypeInformation[T]
+
+ override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowUtil.java
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowUtil.java
new file mode 100644
index 0000000..457e13f
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.StringUtils;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.TimeZone;
+
+/**
+ * Utility for BaseRow.
+ */
+public class BaseRowUtil {
+
+ public static String baseRowToString(BaseRow value, BaseRowTypeInfo
rowTypeInfo, TimeZone tz) {
+ return baseRowToString(value, rowTypeInfo, tz, true);
+ }
+
+ public static String baseRowToString(BaseRow value, BaseRowTypeInfo
rowTypeInfo, TimeZone tz, boolean withHeader) {
+ GenericRow genericRow = toGenericRow(value, rowTypeInfo);
+ return genericRowToString(genericRow, tz, withHeader);
+ }
+
+ private static String fieldToString(Object field, TimeZone tz) {
+ if (field instanceof Date || field instanceof Time || field
instanceof Timestamp) {
+ // TODO support after FLINK-11898 is merged
+ throw new UnsupportedOperationException();
+ } else {
+ return StringUtils.arrayAwareToString(field);
+ }
+ }
+
+ private static String genericRowToString(GenericRow row, TimeZone tz,
boolean withHeader) {
+ StringBuilder sb = new StringBuilder();
+ if (withHeader) {
+ sb.append(row.getHeader()).append("|");
+ }
+ for (int i = 0; i < row.getArity(); i++) {
+ if (i > 0) {
+ sb.append(",");
+ }
+ sb.append(fieldToString(row.getField(i), tz));
+ }
+ return sb.toString();
+ }
+
+ private static GenericRow toGenericRow(BaseRow baseRow, BaseRowTypeInfo
baseRowTypeInfo) {
+ if (baseRow instanceof GenericRow) {
+ return (GenericRow) baseRow;
+ } else {
+ int fieldNum = baseRow.getArity();
+ GenericRow row = new GenericRow(fieldNum);
+ row.setHeader(baseRow.getHeader());
+ InternalType[] internalTypes =
baseRowTypeInfo.getInternalTypes();
+ for (int i = 0; i < fieldNum; i++) {
+ if (baseRow.isNullAt(i)) {
+ row.setField(i, null);
+ } else {
+ row.setField(i,
TypeGetterSetters.get(baseRow, i, internalTypes[i]));
+ }
+ }
+ return row;
+ }
+ }
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/ValuesITCase.scala
similarity index 61%
rename from
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
rename to
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/ValuesITCase.scala
index 82b9590..19f3d55 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/ValuesITCase.scala
@@ -16,19 +16,22 @@
* limitations under the License.
*/
-package org.apache.flink.table.sinks
+package org.apache.flink.table.runtime.batch.sql
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.table.api.Table
+import org.apache.flink.table.runtime.utils.BatchTestBase
-/**
- * Defines an external [[TableSink]] to emit a streaming [[Table]] with
insert, update, and delete
- * changes.
- *
- * @tparam T Type of records that this [[TableSink]] expects and supports.
- */
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
+import org.junit.Assert._
+import org.junit.Test
+
+class ValuesITCase extends BatchTestBase {
+
+ @Test
+ def testValues(): Unit = {
+ val sqlQuery = "SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)"
+ val table = tEnv.sqlQuery(sqlQuery)
+ val actual = collectResults(table)
+ val expected = List("1,2,3")
+ assertEquals(expected.sorted, actual.sorted)
+ }
- /** Emits the DataStream. */
- def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/ValuesITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/ValuesITCase.scala
new file mode 100644
index 0000000..d081cb5
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/ValuesITCase.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.runtime.utils.{StreamingTestBase,
TestingAppendBaseRowSink}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.`type`.InternalTypes.INT
+
+import org.junit.Assert._
+import org.junit.Test
+
+class ValuesITCase extends StreamingTestBase {
+
+ @Test
+ def testValues(): Unit = {
+
tEnv.getConfig.getConf.setBoolean(TableConfigOptions.SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED,
true)
+
+ val sqlQuery = "SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)"
+
+ val outputType = new BaseRowTypeInfo(INT, INT, INT)
+
+ val result = tEnv.sqlQuery(sqlQuery).toAppendStream[BaseRow]
+ val sink = new TestingAppendBaseRowSink(outputType)
+ result.addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = List("0|1,2,3")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
new file mode 100644
index 0000000..0ef3c86
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.scala.{BatchTableEnvironment =>
ScalaBatchTableEnv}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.sinks.{CollectTableSink, TableSinkBase}
+import
org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.util.BaseRowUtil
+import org.apache.flink.util.AbstractID
+
+import _root_.java.util.{TimeZone, ArrayList => JArrayList}
+
+import _root_.scala.collection.JavaConversions._
+
+import org.junit.Before
+
+class BatchTestBase {
+
+ var env: StreamExecutionEnvironment = _
+ val conf: TableConfig = new TableConfig
+
+ // scala tableEnv
+ var tEnv: ScalaBatchTableEnv = _
+
+ @Before
+ def before(): Unit = {
+ // java env
+ val javaEnv = new LocalStreamEnvironment()
+ // scala env
+ this.env = new StreamExecutionEnvironment(javaEnv)
+ this.tEnv = ScalaBatchTableEnv.create(env)
+ }
+
+ def collectResults(table: Table): Seq[String] = {
+ val tableSchema = table.getSchema
+ val sink = new CollectBaseRowTableSink
+ val configuredSink = sink.configure(tableSchema.getFieldNames,
tableSchema.getFieldTypes)
+ .asInstanceOf[CollectBaseRowTableSink]
+ val outType = configuredSink.getOutputType.asInstanceOf[BaseRowTypeInfo]
+ val typeSerializer = outType.createSerializer(new ExecutionConfig)
+ val id = new AbstractID().toString
+ configuredSink.init(typeSerializer, id)
+ tEnv.writeToSink(table, configuredSink, "test")
+ val res = env.execute()
+
+ val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
+ val datas: Seq[BaseRow] =
SerializedListAccumulator.deserializeList(accResult, typeSerializer)
+ val tz = TimeZone.getTimeZone("UTC")
+ datas.map(BaseRowUtil.baseRowToString(_, outType, tz, false))
+ }
+
+}
+
+class CollectBaseRowTableSink
+ extends CollectTableSink[BaseRow](
+ types => new BaseRowTypeInfo(types.map(createInternalTypeFromTypeInfo):
_*)) {
+
+ override protected def copy: TableSinkBase[BaseRow] = {
+ new CollectBaseRowTableSink
+ }
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
new file mode 100644
index 0000000..3c5bdda
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.runtime.state.{FunctionInitializationContext,
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.util.BaseRowUtil
+
+import _root_.java.util.TimeZone
+import _root_.java.util.concurrent.atomic.AtomicInteger
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
+import _root_.scala.collection.mutable.ArrayBuffer
+
+object StreamTestSink {
+
+ TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+
+ private[utils] val idCounter: AtomicInteger = new AtomicInteger(0)
+
+ private[utils] val globalResults =
+ mutable.HashMap.empty[Int, mutable.Map[Int, ArrayBuffer[String]]]
+ private[utils] val globalRetractResults =
+ mutable.HashMap.empty[Int, mutable.Map[Int, ArrayBuffer[String]]]
+ private[utils] val globalUpsertResults =
+ mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]]
+
+ private[utils] def getNewSinkId: Int = {
+ val idx = idCounter.getAndIncrement()
+ this.synchronized{
+ globalResults.put(idx, mutable.HashMap.empty[Int, ArrayBuffer[String]])
+ globalRetractResults.put(idx, mutable.HashMap.empty[Int,
ArrayBuffer[String]])
+ globalUpsertResults.put(idx, mutable.HashMap.empty[Int,
mutable.Map[String, String]])
+ }
+ idx
+ }
+
+ def clear(): Unit = {
+ globalResults.clear()
+ globalRetractResults.clear()
+ globalUpsertResults.clear()
+ }
+}
+
+abstract class AbstractExactlyOnceSink[T] extends RichSinkFunction[T] with
CheckpointedFunction {
+ protected var resultsState: ListState[String] = _
+ protected var localResults: ArrayBuffer[String] = _
+ protected val idx: Int = StreamTestSink.getNewSinkId
+
+ protected var globalResults: mutable.Map[Int, ArrayBuffer[String]]= _
+ protected var globalRetractResults: mutable.Map[Int, ArrayBuffer[String]] = _
+ protected var globalUpsertResults: mutable.Map[Int, mutable.Map[String,
String]] = _
+
+ override def initializeState(context: FunctionInitializationContext): Unit =
{
+ resultsState = context.getOperatorStateStore
+ .getListState(new ListStateDescriptor[String]("sink-results",
Types.STRING))
+
+ localResults = mutable.ArrayBuffer.empty[String]
+
+ if (context.isRestored) {
+ for (value <- resultsState.get().asScala) {
+ localResults += value
+ }
+ }
+
+ val taskId = getRuntimeContext.getIndexOfThisSubtask
+ StreamTestSink.synchronized(
+ StreamTestSink.globalResults(idx) += (taskId -> localResults)
+ )
+ }
+
+ override def snapshotState(context: FunctionSnapshotContext): Unit = {
+ resultsState.clear()
+ for (value <- localResults) {
+ resultsState.add(value)
+ }
+ }
+
+ protected def clearAndStashGlobalResults(): Unit = {
+ if (globalResults == null) {
+ StreamTestSink.synchronized{
+ globalResults = StreamTestSink.globalResults.remove(idx).get
+ globalRetractResults =
StreamTestSink.globalRetractResults.remove(idx).get
+ globalUpsertResults =
StreamTestSink.globalUpsertResults.remove(idx).get
+ }
+ }
+ }
+
+ protected def getResults: List[String] = {
+ clearAndStashGlobalResults()
+ val result = ArrayBuffer.empty[String]
+ this.globalResults.foreach {
+ case (_, list) => result ++= list
+ }
+ result.toList
+ }
+}
+
+final class TestingAppendBaseRowSink(
+ rowTypeInfo: BaseRowTypeInfo, tz: TimeZone)
+ extends AbstractExactlyOnceSink[BaseRow] {
+
+ def this(rowTypeInfo: BaseRowTypeInfo) {
+ this(rowTypeInfo, TimeZone.getTimeZone("UTC"))
+ }
+
+ def invoke(value: BaseRow): Unit = localResults +=
+ BaseRowUtil.baseRowToString(value, rowTypeInfo, tz)
+
+ def getAppendResults: List[String] = getResults
+
+}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
new file mode 100644
index 0000000..d3c30b7
--- /dev/null
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+import org.junit.{Before, Rule}
+
+class StreamingTestBase {
+
+ var env: StreamExecutionEnvironment = _
+ var tEnv: StreamTableEnvironment = _
+ val _tempFolder = new TemporaryFolder
+ var enableObjectReuse = true
+ // used for accurate exception information checking.
+ val expectedException = ExpectedException.none()
+
+ @Rule
+ def thrown: ExpectedException = expectedException
+
+ @Rule
+ def tempFolder: TemporaryFolder = _tempFolder
+
+ @Before
+ def before(): Unit = {
+ StreamTestSink.clear()
+ this.env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(4)
+ this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ if (enableObjectReuse) {
+ this.env.getConfig.enableObjectReuse()
+ }
+ this.tEnv = StreamTableEnvironment.create(env)
+ }
+
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
index 0678e71..5c5909f 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
@@ -28,6 +28,16 @@ import static
org.apache.flink.configuration.ConfigOptions.key;
public class TableConfigOptions {
//
------------------------------------------------------------------------
+ // Source Options
+ //
------------------------------------------------------------------------
+
+ public static final ConfigOption<Boolean>
SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED =
+ key("sql.exec.source.values-input.enabled")
+ .defaultValue(false)
+ .withDescription("Whether support
values source input. The reason for disabling this " +
+
"feature is that checkpoint will not work properly when source finished.");
+
+ //
------------------------------------------------------------------------
// Sort Options
//
------------------------------------------------------------------------
@@ -74,4 +84,13 @@ public class TableConfigOptions {
.defaultValue(Long.MIN_VALUE)
.withDescription("MiniBatch allow
latency(ms). Value > 0 means MiniBatch enabled.");
+ //
------------------------------------------------------------------------
+ // STATE BACKEND Options
+ //
------------------------------------------------------------------------
+
+ public static final ConfigOption<Boolean>
SQL_EXEC_STATE_BACKEND_ON_HEAP =
+ key("sql.exec.statebackend.onheap")
+ .defaultValue(false)
+ .withDescription("Whether the
statebackend is on heap.");
+
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
index f0484f7..4154fe7 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
@@ -117,6 +117,10 @@ public final class GenericRow extends ObjectArrayRow {
this.fields[ordinal] = value;
}
+ public Object getField(int ordinal) {
+ return this.fields[ordinal];
+ }
+
public static GenericRow of(Object... values) {
GenericRow row = new GenericRow(values.length);
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
index 043340b..736fbcd 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
@@ -66,4 +66,16 @@ public abstract class GeneratedClass<T> implements
Serializable {
}
return compiledClass;
}
+
+ public String getClassName() {
+ return className;
+ }
+
+ public String getCode() {
+ return code;
+ }
+
+ public Object[] getReferences() {
+ return references;
+ }
}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/values/ValuesInputFormat.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/values/ValuesInputFormat.java
new file mode 100644
index 0000000..45ddd4c
--- /dev/null
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/values/ValuesInputFormat.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.values;
+
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedInput;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Generated ValuesInputFormat.
+ */
+public class ValuesInputFormat
+ extends GenericInputFormat<BaseRow>
+ implements NonParallelInput, ResultTypeQueryable<BaseRow> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ValuesInputFormat.class);
+ private GeneratedInput<GenericInputFormat<BaseRow>> generatedInput;
+ private BaseRowTypeInfo returnType;
+ private GenericInputFormat<BaseRow> format;
+
+ public ValuesInputFormat(GeneratedInput<GenericInputFormat<BaseRow>>
generatedInput, BaseRowTypeInfo returnType) {
+ this.generatedInput = generatedInput;
+ this.returnType = returnType;
+ }
+
+ @Override
+ public void open(GenericInputSplit split) {
+ LOG.debug("Compiling GenericInputFormat: $name \n\n
Code:\n$code",
+ generatedInput.getClassName(),
generatedInput.getCode());
+ LOG.debug("Instantiating GenericInputFormat.");
+ format =
generatedInput.newInstance(getRuntimeContext().getUserCodeClassLoader());
+ }
+
+ @Override
+ public boolean reachedEnd() throws IOException {
+ return format.reachedEnd();
+ }
+
+ @Override
+ public BaseRow nextRecord(BaseRow reuse) throws IOException {
+ return format.nextRecord(reuse);
+ }
+
+ @Override
+ public TypeInformation<BaseRow> getProducedType() {
+ return returnType;
+ }
+
+}
diff --git
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
index 11e57f8..8778767 100644
---
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
+++
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
@@ -142,4 +142,9 @@ public class BaseRowTypeInfo extends
TupleTypeInfoBase<BaseRow> {
public BaseRowSerializer createSerializer(ExecutionConfig config) {
return new BaseRowSerializer(config, internalTypes);
}
+
+ public InternalType[] getInternalTypes() {
+ return internalTypes;
+ }
+
}