This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6333fc7  [FLINK-11975][table-planner-blink] Support running a simple 
select from values query (#8035)
6333fc7 is described below

commit 6333fc734f730e64c5f96390f345e7e0b1352662
Author: Jing Zhang <[email protected]>
AuthorDate: Tue Mar 26 20:58:09 2019 +0800

    [FLINK-11975][table-planner-blink] Support running a simple select from 
values query (#8035)
---
 .../flink/table/api/BatchTableEnvironment.scala    |  66 +++++++
 .../flink/table/api/StreamTableEnvironment.scala   | 111 +++++++++++-
 .../apache/flink/table/api/TableEnvironment.scala  |  27 +++
 .../org/apache/flink/table/api/TableImpl.scala     |  19 +-
 .../scala/org/apache/flink/table/api/Types.scala   | 200 +++++++++++++++++++++
 .../table/api/scala/StreamTableEnvironment.scala   |  47 +++++
 .../flink/table/api/scala/TableConversions.scala   | 100 +++++++++++
 .../org/apache/flink/table/api/scala/package.scala |   4 +
 .../flink/table/calcite/FlinkTypeFactory.scala     |   1 +
 .../apache/flink/table/codegen/CodeGenUtils.scala  |   9 +
 .../table/codegen/InputFormatCodeGenerator.scala   |  93 ++++++++++
 .../SinkCodeGenerator.scala}                       |  38 ++--
 .../flink/table/codegen/ValuesCodeGenerator.scala  |  63 +++++++
 .../plan/nodes/physical/batch/BatchExecSink.scala  |  74 +++++++-
 .../nodes/physical/batch/BatchExecValues.scala     |  31 +++-
 .../nodes/physical/stream/StreamExecSink.scala     | 119 +++++++++++-
 .../nodes/physical/stream/StreamExecValues.scala   |  40 ++++-
 .../flink/table/plan/optimize/Optimizer.scala      |   2 +-
 .../table/plan/rules/FlinkBatchRuleSets.scala      |   3 +-
 .../table/plan/rules/FlinkStreamRuleSets.scala     |   3 +-
 .../rules/physical/batch/BatchExecSinkRule.scala   |  54 ++++++
 .../rules/physical/stream/StreamExecSinkRule.scala |  54 ++++++
 ...TableSink.scala => AppendStreamTableSink.scala} |  16 +-
 ...tStreamTableSink.scala => BatchTableSink.scala} |  16 +-
 .../flink/table/sinks/CollectTableSink.scala       |  83 +++++++++
 .../flink/table/sinks/DataStreamTableSink.scala    |  65 +++++++
 ...ableSink.scala => RetractStreamTableSink.scala} |  17 +-
 .../flink/table/sinks/UpsertStreamTableSink.scala  |  70 ++++++++
 .../org/apache/flink/table/util/BaseRowUtil.java   |  89 +++++++++
 .../table/runtime/batch/sql/ValuesITCase.scala}    |  27 +--
 .../table/runtime/stream/sql/ValuesITCase.scala    |  51 ++++++
 .../flink/table/runtime/utils/BatchTestBase.scala  |  85 +++++++++
 .../flink/table/runtime/utils/StreamTestSink.scala | 134 ++++++++++++++
 .../table/runtime/utils/StreamingTestBase.scala    |  55 ++++++
 .../apache/flink/table/api/TableConfigOptions.java |  19 ++
 .../apache/flink/table/dataformat/GenericRow.java  |   4 +
 .../flink/table/generated/GeneratedClass.java      |  12 ++
 .../table/runtime/values/ValuesInputFormat.java    |  75 ++++++++
 .../flink/table/typeutils/BaseRowTypeInfo.java     |   5 +
 39 files changed, 1912 insertions(+), 69 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 95838cf..8991aef 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -17,9 +17,13 @@
  */
 package org.apache.flink.table.api
 
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.transformations.StreamTransformation
 import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.plan.nodes.calcite.LogicalSink
+import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.plan.`trait`.FlinkRelDistributionTraitDef
 import org.apache.flink.table.plan.optimize.{BatchOptimizer, Optimizer}
 import org.apache.flink.table.plan.schema.{BatchTableSourceTable, 
TableSourceSinkTable, TableSourceTable}
@@ -32,6 +36,8 @@ import org.apache.calcite.plan.ConventionTraitDef
 import org.apache.calcite.rel.RelCollationTraitDef
 import org.apache.calcite.sql.SqlExplainLevel
 
+import _root_.scala.collection.JavaConversions._
+
 /**
   *  A session to construct between [[Table]] and [[DataStream]], its main 
function is:
   *
@@ -80,6 +86,66 @@ class BatchTableEnvironment(
   }
 
   /**
+    * Merge global job parameters and table config parameters,
+    * and set the merged result to GlobalJobParameters
+    */
+  private def mergeParameters(): Unit = {
+    if (streamEnv != null && streamEnv.getConfig != null) {
+      val parameters = new Configuration()
+      if (config != null && config.getConf != null) {
+        parameters.addAll(config.getConf)
+      }
+
+      if (streamEnv.getConfig.getGlobalJobParameters != null) {
+        streamEnv.getConfig.getGlobalJobParameters.toMap.foreach {
+          kv => parameters.setString(kv._1, kv._2)
+        }
+      }
+
+      streamEnv.getConfig.setGlobalJobParameters(parameters)
+    }
+  }
+
+  /**
+    * Writes a [[Table]] to a [[TableSink]].
+    *
+    * Internally, the [[Table]] is translated into a [[DataStream]] and handed 
over to the
+    * [[TableSink]] to write it.
+    *
+    * @param table The [[Table]] to write.
+    * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @tparam T The expected type of the [[DataStream]] which represents the 
[[Table]].
+    */
+  override private[table] def writeToSink[T](
+      table: Table,
+      sink: TableSink[T],
+      sinkName: String): Unit = {
+    mergeParameters()
+
+    val sinkNode = 
LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
+    val optimizedPlan = optimize(sinkNode)
+    val optimizedNodes = translateNodeDag(Seq(optimizedPlan))
+    require(optimizedNodes.size() == 1)
+    translateToPlan(optimizedNodes.head)
+  }
+
+  /**
+    * Translates a [[BatchExecNode]] plan into a [[StreamTransformation]].
+    * Converts to target type if necessary.
+    *
+    * @param node        The plan to translate.
+    * @return The [[StreamTransformation]] that corresponds to the given node.
+    */
+  private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] = 
{
+    node match {
+      case node: BatchExecNode[_] => node.translateToPlan(this)
+      case _ =>
+        throw new TableException("Cannot generate BoundedStream due to an 
invalid logical plan. " +
+                                   "This is a bug and should not happen. 
Please file an issue.")
+    }
+  }
+
+  /**
     * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
     * the result of the given [[Table]].
     *
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index f285214..abb82ce 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -18,15 +18,24 @@
 
 package org.apache.flink.table.api
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+import org.apache.flink.runtime.state.memory.MemoryStateBackend
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.transformations.StreamTransformation
 import org.apache.flink.table.calcite.FlinkRelBuilder
-import org.apache.flink.table.plan.`trait`.{AccModeTraitDef, 
FlinkRelDistributionTraitDef, MiniBatchIntervalTraitDef, 
UpdateAsRetractionTraitDef}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.calcite.LogicalSink
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
 import org.apache.flink.table.plan.optimize.{Optimizer, StreamOptimizer}
 import org.apache.flink.table.plan.schema._
 import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.plan.`trait`.{AccModeTraitDef, 
FlinkRelDistributionTraitDef, MiniBatchIntervalTraitDef, 
UpdateAsRetractionTraitDef}
 import org.apache.flink.table.plan.util.FlinkRelOptUtil
+import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink}
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 
@@ -34,6 +43,8 @@ import org.apache.calcite.plan.ConventionTraitDef
 import org.apache.calcite.rel.RelCollationTraitDef
 import org.apache.calcite.sql.SqlExplainLevel
 
+import _root_.scala.collection.JavaConversions._
+
 /**
   * The base class for stream TableEnvironments.
   *
@@ -60,6 +71,8 @@ abstract class StreamTableEnvironment(
   // the naming pattern for internally registered tables.
   private val internalNamePattern = "^_DataStreamTable_[0-9]+$".r
 
+  private var isConfigMerged: Boolean = false
+
   override def queryConfig: StreamQueryConfig = new StreamQueryConfig
 
   override protected def getOptimizer: Optimizer = new StreamOptimizer(this)
@@ -92,6 +105,102 @@ abstract class StreamTableEnvironment(
   )
 
   /**
+    * Merge global job parameters and table config parameters,
+    * and set the merged result to GlobalJobParameters
+    */
+  private def mergeParameters(): Unit = {
+    if (!isConfigMerged && execEnv != null && execEnv.getConfig != null) {
+      val parameters = new Configuration()
+      if (config != null && config.getConf != null) {
+        parameters.addAll(config.getConf)
+      }
+
+      if (execEnv.getConfig.getGlobalJobParameters != null) {
+        execEnv.getConfig.getGlobalJobParameters.toMap.foreach {
+          kv => parameters.setString(kv._1, kv._2)
+        }
+      }
+      val isHeapState = Option(execEnv.getStateBackend) match {
+        case Some(backend) if backend.isInstanceOf[MemoryStateBackend] ||
+          backend.isInstanceOf[FsStateBackend]=> true
+        case None => true
+        case _ => false
+      }
+      parameters.setBoolean(TableConfigOptions.SQL_EXEC_STATE_BACKEND_ON_HEAP, 
isHeapState)
+      execEnv.getConfig.setGlobalJobParameters(parameters)
+      isConfigMerged = true
+    }
+  }
+
+  /**
+    * Writes a [[Table]] to a [[TableSink]].
+    *
+    * Internally, the [[Table]] is translated into a [[DataStream]] and handed 
over to the
+    * [[TableSink]] to write it.
+    *
+    * @param table The [[Table]] to write.
+    * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @tparam T The expected type of the [[DataStream]] which represents the 
[[Table]].
+    */
+  override private[table] def writeToSink[T](
+      table: Table,
+      sink: TableSink[T],
+      sinkName: String): Unit = {
+    val sinkNode = 
LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
+    translateSink(sinkNode)
+  }
+
+  /**
+    * Translates a [[Table]] into a [[DataStream]].
+    *
+    * The transformation involves optimizing the relational expression tree as 
defined by
+    * Table API calls and / or SQL queries and generating corresponding 
[[DataStream]] operators.
+    *
+    * @param table               The root node of the relational expression 
tree.
+    * @param updatesAsRetraction Set to true to encode updates as retraction 
messages.
+    * @param withChangeFlag      Set to true to emit records with change flags.
+    * @param resultType          The 
[[org.apache.flink.api.common.typeinfo.TypeInformation[_]] of
+    *                            the resulting [[DataStream]].
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The [[DataStream]] that corresponds to the translated [[Table]].
+    */
+  protected def translateToDataStream[T](
+      table: Table,
+      updatesAsRetraction: Boolean,
+      withChangeFlag: Boolean,
+      resultType: TypeInformation[T]): DataStream[T] = {
+    val sink = new DataStreamTableSink[T](table, resultType, 
updatesAsRetraction, withChangeFlag)
+    val sinkName = createUniqueTableName()
+    val sinkNode = 
LogicalSink.create(table.asInstanceOf[TableImpl].getRelNode, sink, sinkName)
+    val transformation = translateSink(sinkNode)
+    new DataStream(execEnv, transformation).asInstanceOf[DataStream[T]]
+  }
+
+  private def translateSink(sink: LogicalSink): StreamTransformation[_] = {
+    mergeParameters()
+
+    val optimizedPlan = optimize(sink)
+    val optimizedNodes = translateNodeDag(Seq(optimizedPlan))
+    require(optimizedNodes.size() == 1)
+    translateToPlan(optimizedNodes.head)
+  }
+
+  /**
+    * Translates a [[StreamExecNode]] plan into a [[StreamTransformation]].
+    *
+    * @param node The plan to translate.
+    * @return The [[StreamTransformation]] of type [[BaseRow]].
+    */
+  private def translateToPlan(node: ExecNode[_, _]): StreamTransformation[_] = 
{
+    node match {
+      case node: StreamExecNode[_] => node.translateToPlan(this)
+      case _ =>
+        throw new TableException("Cannot generate DataStream due to an invalid 
logical plan. " +
+                                   "This is a bug and should not happen. 
Please file an issue.")
+    }
+  }
+
+  /**
     * Returns the AST of the specified Table API and SQL queries and the 
execution plan to compute
     * the result of the given [[Table]].
     *
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 4c2bdc4..82fe8e4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.api
 
+import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils.{RowTypeInfo, _}
@@ -25,9 +26,12 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 import org.apache.flink.table.calcite.{FlinkContextImpl, FlinkPlannerImpl, 
FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
 import org.apache.flink.table.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.plan.cost.FlinkCostFactory
+import org.apache.flink.table.plan.nodes.exec.ExecNode
+import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
 import org.apache.flink.table.plan.optimize.Optimizer
 import org.apache.flink.table.plan.schema.RelTable
 import org.apache.flink.table.plan.stats.FlinkStatistic
+import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.sources.TableSource
 import org.apache.flink.types.Row
 
@@ -152,6 +156,18 @@ abstract class TableEnvironment(val config: TableConfig) {
   protected def getOptimizer: Optimizer
 
   /**
+    * Writes a [[Table]] to a [[TableSink]].
+    *
+    * @param table The [[Table]] to write.
+    * @param sink The [[TableSink]] to write the [[Table]] to.
+    * @tparam T The data type that the [[TableSink]] expects.
+    */
+  private[table] def writeToSink[T](
+      table: Table,
+      sink: TableSink[T],
+      sinkName: String = null): Unit
+
+  /**
     * Generates the optimized [[RelNode]] dag from the original relational 
nodes.
     *
     * @param roots The root nodes of the relational expression tree.
@@ -172,6 +188,17 @@ abstract class TableEnvironment(val config: TableConfig) {
   private[flink] def optimize(root: RelNode): RelNode = 
optimize(Seq(root)).head
 
   /**
+    * Convert [[org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel]] 
DAG
+    * to [[ExecNode]] DAG and translate them.
+    */
+  @VisibleForTesting
+  private[flink] def translateNodeDag(rels: Seq[RelNode]): Seq[ExecNode[_, _]] 
= {
+    require(rels.nonEmpty && rels.forall(_.isInstanceOf[FlinkPhysicalRel]))
+    // convert FlinkPhysicalRel DAG to ExecNode DAG
+    rels.map(_.asInstanceOf[ExecNode[_, _]])
+  }
+
+  /**
     * Registers a [[Table]] under a unique name in the TableEnvironment's 
catalog.
     * Registered tables can be referenced in SQL queries.
     *
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
index a025f45..4d58908 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/TableImpl.scala
@@ -18,9 +18,14 @@
 
 package org.apache.flink.table.api
 
-import org.apache.calcite.rel.RelNode
+import org.apache.flink.table.calcite.FlinkTypeFactory._
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.TemporalTableFunction
+import 
org.apache.flink.table.`type`.TypeConverters.createInternalTypeInfoFromInternalType
+
+import org.apache.calcite.rel.RelNode
+
+import _root_.scala.collection.JavaConversions._
 
 /**
   * The implementation of the [[Table]].
@@ -35,12 +40,22 @@ import 
org.apache.flink.table.functions.TemporalTableFunction
   */
 class TableImpl(val tableEnv: TableEnvironment, relNode: RelNode) extends 
Table {
 
+  private lazy val tableSchema: TableSchema = {
+    val rowType = relNode.getRowType
+    val fieldNames = rowType.getFieldList.map(_.getName)
+    val fieldTypes = rowType.getFieldList map { tp =>
+      val internalType = toInternalType(tp.getType)
+      createInternalTypeInfoFromInternalType(internalType)
+    }
+    new TableSchema(fieldNames.toArray, fieldTypes.toArray)
+  }
+
   /**
     * Returns the Calcite RelNode represent this Table.
     */
   def getRelNode: RelNode = relNode
 
-  override def getSchema: TableSchema = ???
+  override def getSchema: TableSchema = tableSchema
 
   override def printSchema(): Unit = ???
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Types.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Types.scala
new file mode 100644
index 0000000..2e7d81d
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/Types.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
TypeInformation, Types => JTypes}
+import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, 
ObjectArrayTypeInfo}
+import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
+import org.apache.flink.types.Row
+
+import _root_.java.{lang, math, sql, util}
+
+import _root_.scala.annotation.varargs
+
+/**
+  * This class enumerates all supported types of the Table API & SQL.
+  */
+object Types {
+
+  /**
+    * Returns type information for a Table API string or SQL VARCHAR type.
+    */
+  val STRING: TypeInformation[String] = JTypes.STRING
+
+  /**
+    * Returns type information for a Table API boolean or SQL BOOLEAN type.
+    */
+  val BOOLEAN: TypeInformation[lang.Boolean] = JTypes.BOOLEAN
+
+  /**
+    * Returns type information for a Table API byte or SQL TINYINT type.
+    */
+  val BYTE: TypeInformation[lang.Byte] = JTypes.BYTE
+
+  /**
+    * Returns type information for a Table API short or SQL SMALLINT type.
+    */
+  val SHORT: TypeInformation[lang.Short] = JTypes.SHORT
+
+  /**
+    * Returns type information for a Table API integer or SQL INT/INTEGER type.
+    */
+  val INT: TypeInformation[lang.Integer] = JTypes.INT
+
+  /**
+    * Returns type information for a Table API long or SQL BIGINT type.
+    */
+  val LONG: TypeInformation[lang.Long] = JTypes.LONG
+
+  /**
+    * Returns type information for a Table API float or SQL FLOAT/REAL type.
+    */
+  val FLOAT: TypeInformation[lang.Float] = JTypes.FLOAT
+
+  /**
+    * Returns type information for a Table API integer or SQL DOUBLE type.
+    */
+  val DOUBLE: TypeInformation[lang.Double] = JTypes.DOUBLE
+
+  /**
+    * Returns type information for a Table API big decimal or SQL DECIMAL type.
+    */
+  val DECIMAL: TypeInformation[math.BigDecimal] = JTypes.BIG_DEC
+
+  /**
+    * Returns type information for a Table API SQL date or SQL DATE type.
+    */
+  val SQL_DATE: TypeInformation[sql.Date] = JTypes.SQL_DATE
+
+  /**
+    * Returns type information for a Table API SQL time or SQL TIME type.
+    */
+  val SQL_TIME: TypeInformation[sql.Time] = JTypes.SQL_TIME
+
+  /**
+    * Returns type information for a Table API SQL timestamp or SQL TIMESTAMP 
type.
+    */
+  val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP
+
+  /**
+    * Returns type information for a Table API interval of months.
+    */
+  val INTERVAL_MONTHS: TypeInformation[lang.Integer] = 
TimeIntervalTypeInfo.INTERVAL_MONTHS
+
+  /**
+    * Returns type information for a Table API interval milliseconds.
+    */
+  val INTERVAL_MILLIS: TypeInformation[lang.Long] = 
TimeIntervalTypeInfo.INTERVAL_MILLIS
+
+  /**
+    * Returns type information for [[org.apache.flink.types.Row]] with fields 
of the given types.
+    *
+    * A row is a variable-length, null-aware composite type for storing 
multiple values in a
+    * deterministic field order. Every field can be null regardless of the 
field's type.
+    * The type of row fields cannot be automatically inferred; therefore, it 
is required to provide
+    * type information whenever a row is used.
+    *
+    * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> 
fields, however, all
+    * row instances must strictly adhere to the schema defined by the type 
info.
+    *
+    * This method generates type information with fields of the given types; 
the fields have
+    * the default names (f0, f1, f2 ..).
+    *
+    * @param types The types of the row fields, e.g., Types.STRING, Types.INT
+    */
+  @varargs
+  def ROW(types: TypeInformation[_]*): TypeInformation[Row] = {
+    JTypes.ROW(types: _*)
+  }
+
+  /**
+    * Returns type information for [[org.apache.flink.types.Row]] with fields 
of the given types
+    * and with given names.
+    *
+    * A row is a variable-length, null-aware composite type for storing 
multiple values in a
+    * deterministic field order. Every field can be null independent of the 
field's type.
+    * The type of row fields cannot be automatically inferred; therefore, it 
is required to provide
+    * type information whenever a row is used.
+    *
+    * <p>The schema of rows can have up to <code>Integer.MAX_VALUE</code> 
fields, however, all
+    * row instances must strictly adhere to the schema defined by the type 
info.
+    *
+    * Example use: `Types.ROW(Array("name", "number"), Array(Types.STRING, 
Types.INT))`.
+    *
+    * @param fieldNames array of field names
+    * @param types      array of field types
+    */
+  def ROW(fieldNames: Array[String], types: Array[TypeInformation[_]]): 
TypeInformation[Row] = {
+    JTypes.ROW_NAMED(fieldNames, types: _*)
+  }
+
+  /**
+    * Generates type information for an array consisting of Java primitive 
elements. The elements
+    * do not support null values.
+    *
+    * @param elementType type of the array elements; e.g. Types.INT
+    */
+  def PRIMITIVE_ARRAY(elementType: TypeInformation[_]): TypeInformation[_] = {
+    elementType match {
+      case BOOLEAN =>  PrimitiveArrayTypeInfo.BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO
+      case BYTE =>  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO
+      case SHORT =>  PrimitiveArrayTypeInfo.SHORT_PRIMITIVE_ARRAY_TYPE_INFO
+      case INT =>  PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO
+      case LONG =>  PrimitiveArrayTypeInfo.LONG_PRIMITIVE_ARRAY_TYPE_INFO
+      case FLOAT =>  PrimitiveArrayTypeInfo.FLOAT_PRIMITIVE_ARRAY_TYPE_INFO
+      case DOUBLE =>  PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO
+      case _ =>
+        throw new TableException(s"$elementType cannot be an element of a 
primitive array." +
+            s"Only Java primitive types are supported.")
+    }
+  }
+
+  /**
+    * Generates type information for an array consisting of Java object 
elements. Null values for
+    * elements are supported.
+    *
+    * @param elementType type of the array elements; e.g. Types.STRING or 
Types.INT
+    */
+  def OBJECT_ARRAY[E](elementType: TypeInformation[E]): 
TypeInformation[Array[E]] = {
+    ObjectArrayTypeInfo.getInfoFor(elementType)
+  }
+
+  /**
+    * Generates type information for a Java HashMap. Null values in keys are 
not supported. An
+    * entry's value can be null.
+    *
+    * @param keyType type of the keys of the map e.g. Types.STRING
+    * @param valueType type of the values of the map e.g. Types.STRING
+    */
+  def MAP[K, V](
+      keyType: TypeInformation[K],
+      valueType: TypeInformation[V]): TypeInformation[util.Map[K, V]] = {
+    new MapTypeInfo(keyType, valueType)
+  }
+
+  /**
+    * Generates type information for a Multiset. A Multiset is baked by a Java 
HashMap and maps an
+    * arbitrary key to an integer value. Null values in keys are not supported.
+    *
+    * @param elementType type of the elements of the multiset e.g. Types.STRING
+    */
+  def MULTISET[E](elementType: TypeInformation[E]): 
TypeInformation[util.Map[E, lang.Integer]] = {
+    new MultisetTypeInfo(elementType)
+  }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 04df0cf..136ea4d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -17,6 +17,10 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.asScalaStream
+import org.apache.flink.streaming.api.scala.{createTypeInformation => _, _}
 import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}
 import org.apache.flink.table.api.{Table, TableConfig, TableEnvironment}
 
@@ -127,6 +131,49 @@ class StreamTableEnvironment @deprecated(
     registerDataStreamInternal(name, dataStream.javaStream, exprs)
   }
 
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @param table The [[Table]] to convert.
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toAppendStream[T: TypeInformation](table: Table): DataStream[T] = {
+    val returnType = createTypeInformation[T]
+    asScalaStream(translateToDataStream[T](
+      table,
+      updatesAsRetraction = false,
+      withChangeFlag = false,
+      returnType))
+  }
+
+  /**
+    * Converts the given [[Table]] into a [[DataStream]] of add and retract 
messages.
+    * The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[Boolean]] flag indicates an add message, a false flag indicates 
a retract message.
+    *
+    * @param table The [[Table]] to convert.
+    * @tparam T The type of the requested data type.
+    * @return The converted [[DataStream]].
+    */
+  def toRetractStream[T: TypeInformation](table: Table): DataStream[(Boolean, 
T)] = {
+    val returnType = createTypeInformation[(Boolean, T)]
+    asScalaStream(translateToDataStream[(Boolean, T)](
+      table,
+      updatesAsRetraction = true,
+      withChangeFlag = true,
+      returnType))
+  }
 
 }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
new file mode 100644
index 0000000..d565c85
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/TableConversions.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.table.api.scala.{StreamTableEnvironment => 
ScalaStreamTableEnv}
+import org.apache.flink.table.api.{Table, TableException, TableImpl}
+
+/**
+  * Holds methods to convert a [[Table]] into a [[DataSet]] or a 
[[DataStream]].
+  *
+  * @param table The table to convert.
+  */
+class TableConversions(table: Table) {
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * NOTE: This method only supports conversion of append-only tables. In 
order to make this
+    * more explicit in the future, please use [[toAppendStream()]] instead.
+    * If add and retract messages are required, use [[toRetractStream()]].
+    *
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  @deprecated("This method only supports conversion of append-only tables. In 
order to make this" +
+      " more explicit in the future, please use toAppendStream() instead.")
+  def toDataStream[T: TypeInformation]: DataStream[T] = toAppendStream
+
+  /**
+    * Converts the given [[Table]] into an append [[DataStream]] of a 
specified type.
+    *
+    * The [[Table]] must only have insert (append) changes. If the [[Table]] 
is also modified
+    * by update or delete changes, the conversion will fail.
+    *
+    * The fields of the [[Table]] are mapped to [[DataStream]] fields as 
follows:
+    * - [[org.apache.flink.types.Row]] and Scala Tuple types: Fields are 
mapped by position, field
+    * types must match.
+    * - POJO [[DataStream]] types: Fields are mapped by field name, field 
types must match.
+    *
+    * @tparam T The type of the resulting [[DataStream]].
+    * @return The converted [[DataStream]].
+    */
+  def toAppendStream[T: TypeInformation]: DataStream[T] = {
+    table.asInstanceOf[TableImpl].tableEnv match {
+      case tEnv: ScalaStreamTableEnv =>
+        tEnv.toAppendStream(table)
+      case _ =>
+        throw new TableException(
+          "Only tables that originate from Scala DataStreams " +
+            "can be converted to Scala DataStreams.")
+    }
+  }
+
+  /** Converts the [[Table]] to a [[DataStream]] of add and retract messages.
+    * The message will be encoded as [[Tuple2]]. The first field is a 
[[Boolean]] flag,
+    * the second field holds the record of the specified type [[T]].
+    *
+    * A true [[Boolean]] flag indicates an add message, a false flag indicates 
a retract message.
+    *
+    */
+  def toRetractStream[T: TypeInformation]: DataStream[(Boolean, T)] = {
+    table.asInstanceOf[TableImpl].tableEnv match {
+      case tEnv: ScalaStreamTableEnv =>
+        tEnv.toRetractStream(table)
+      case _ =>
+        throw new TableException(
+          "Only tables that originate from Scala DataStreams " +
+              "can be converted to Scala DataStreams.")
+    }
+  }
+
+}
+
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
index 20fbe8e..eb2ff27 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -60,6 +60,10 @@ import _root_.scala.language.implicitConversions
   */
 package object scala {
 
+  implicit def table2TableConversions(table: Table): TableConversions = {
+    new TableConversions(table)
+  }
+
   implicit def dataStream2DataStreamConversions[T](set: DataStream[T]): 
DataStreamConversions[T] = {
     new DataStreamConversions[T](set, set.dataType)
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 43826c0..81fea7d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -21,6 +21,7 @@ package org.apache.flink.table.calcite
 import org.apache.flink.table.`type`._
 import org.apache.flink.table.api.{TableException, TableSchema}
 import org.apache.flink.table.plan.schema.{GenericRelDataType, _}
+
 import org.apache.calcite.avatica.util.TimeUnit
 import org.apache.calcite.jdbc.JavaTypeFactoryImpl
 import org.apache.calcite.rel.`type`._
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
index b27280e..4e7003d 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGenUtils.scala
@@ -25,6 +25,7 @@ import org.apache.flink.table.`type`._
 import org.apache.flink.table.dataformat.DataFormatConverters.IdentityConverter
 import org.apache.flink.table.dataformat.{Decimal, _}
 import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.types.Row
 
 import java.lang.reflect.Method
 import java.lang.{Boolean => JBoolean, Byte => JByte, Character => JChar, 
Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Short => 
JShort}
@@ -163,6 +164,14 @@ object CodeGenUtils {
     case _ => "null"
   }
 
+  /**
+    * If it's internally compatible, don't need to DataStructure converter.
+    * clazz != classOf[Row] => Row can only infer GenericType[Row].
+    */
+  def isInternalClass(clazz: Class[_], t: TypeInformation[_]): Boolean =
+    clazz != classOf[Object] && clazz != classOf[Row] &&
+      (classOf[BaseRow].isAssignableFrom(clazz) || clazz == t.getTypeClass)
+
   // -------------------------- Method & Enum 
---------------------------------------
 
   def qualifyMethod(method: Method): String =
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
new file mode 100644
index 0000000..7a41f13
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/InputFormatCodeGenerator.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.codegen
+
+import org.apache.flink.api.common.io.GenericInputFormat
+import org.apache.flink.table.codegen.CodeGenUtils.newName
+import org.apache.flink.table.codegen.Indenter.toISC
+import org.apache.flink.table.dataformat.GenericRow
+import org.apache.flink.table.generated.GeneratedInput
+import org.apache.flink.table.`type`.InternalType
+
+/**
+  * A code generator for generating Flink [[GenericInputFormat]]s.
+  */
+object InputFormatCodeGenerator {
+
+  /**
+    * Generates a values input format that can be passed to Java compiler.
+    *
+    * @param ctx The code generator context
+    * @param name Class name of the input format. Must not be unique but has 
to be a
+    *             valid Java class identifier.
+    * @param records code for creating records
+    * @param returnType expected return type
+    * @param outRecordTerm term of the output
+    * @tparam T Return type of the Flink Function.
+    * @return instance of GeneratedFunction
+    */
+  def generateValuesInputFormat[T](
+      ctx: CodeGeneratorContext,
+      name: String,
+      records: Seq[String],
+      returnType: InternalType,
+      outRecordTerm: String = CodeGenUtils.DEFAULT_OUT_RECORD_TERM,
+      outRecordWriterTerm: String = 
CodeGenUtils.DEFAULT_OUT_RECORD_WRITER_TERM)
+    : GeneratedInput[GenericInputFormat[T]] = {
+    val funcName = newName(name)
+
+    ctx.addReusableOutputRecord(returnType, classOf[GenericRow], outRecordTerm,
+                                Some(outRecordWriterTerm))
+
+    val funcCode = j"""
+      public class $funcName extends 
${classOf[GenericInputFormat[_]].getCanonicalName} {
+
+        private int nextIdx = 0;
+
+        ${ctx.reuseMemberCode()}
+
+        public $funcName(Object[] references) throws Exception {
+          ${ctx.reuseInitCode()}
+        }
+
+        @Override
+        public boolean reachedEnd() throws java.io.IOException {
+          return nextIdx >= ${records.length};
+        }
+
+        @Override
+        public Object nextRecord(Object reuse) {
+          switch (nextIdx) {
+            ${records.zipWithIndex.map { case (r, i) =>
+              s"""
+                 |case $i:
+                 |  $r
+                 |break;
+                       """.stripMargin
+            }.mkString("\n")}
+          }
+          nextIdx++;
+          return $outRecordTerm;
+        }
+      }
+    """.stripMargin
+
+    new GeneratedInput(funcName, funcCode, ctx.references.toArray)
+  }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SinkCodeGenerator.scala
similarity index 51%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SinkCodeGenerator.scala
index 059a971..f51803f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/SinkCodeGenerator.scala
@@ -16,30 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.plan.nodes.physical.batch
+package org.apache.flink.table.codegen
 
-import org.apache.flink.table.plan.nodes.calcite.Sink
-import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.api.common.functions.InvalidTypesException
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.table.sinks.{DataStreamTableSink, TableSink}
 
-import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
-import org.apache.calcite.rel.RelNode
+object SinkCodeGenerator {
 
-import java.util
-
-/**
-  * Batch physical RelNode to to write data into an external sink defined by a 
[[TableSink]].
-  */
-class BatchExecSink[T](
-    cluster: RelOptCluster,
-    traitSet: RelTraitSet,
-    inputRel: RelNode,
-    sink: TableSink[T],
-    sinkName: String)
-  extends Sink(cluster, traitSet, inputRel, sink, sinkName)
-  with BatchPhysicalRel {
-
-  override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
-    new BatchExecSink(cluster, traitSet, inputs.get(0), sink, sinkName)
+  private[flink] def extractTableSinkTypeClass(sink: TableSink[_]): Class[_] = 
{
+    try {
+      sink match {
+        // DataStreamTableSink has no generic class, so we need get the type 
to get type class.
+        case sink: DataStreamTableSink[_] => sink.getOutputType.getTypeClass
+        case _ => TypeExtractor.createTypeInfo(sink, classOf[TableSink[_]], 
sink.getClass, 0)
+                  .getTypeClass.asInstanceOf[Class[_]]
+      }
+    } catch {
+      case _: InvalidTypesException =>
+        classOf[Object]
+    }
   }
 
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala
new file mode 100644
index 0000000..c207bd6
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/codegen/ValuesCodeGenerator.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.codegen
+
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.dataformat.{BaseRow, GenericRow}
+import org.apache.flink.table.runtime.values.ValuesInputFormat
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.RexLiteral
+
+import com.google.common.collect.ImmutableList
+
+import scala.collection.JavaConversions._
+
+object ValuesCodeGenerator {
+
+  def generatorInputFormat(
+    tableEnv: TableEnvironment,
+    rowType: RelDataType,
+    tuples: ImmutableList[ImmutableList[RexLiteral]],
+    description: String): ValuesInputFormat = {
+    val config = tableEnv.getConfig
+    val outputType = FlinkTypeFactory.toInternalRowType(rowType)
+
+    val ctx = CodeGeneratorContext(config)
+    val exprGenerator = new ExprCodeGenerator(ctx, false)
+    // generate code for every record
+    val generatedRecords = tuples.map { r =>
+      exprGenerator.generateResultExpression(
+        r.map(exprGenerator.generateExpression), outputType, 
classOf[GenericRow])
+    }
+
+    // generate input format
+    val generatedFunction = 
InputFormatCodeGenerator.generateValuesInputFormat[BaseRow](
+      ctx,
+      description,
+      generatedRecords.map(_.code),
+      outputType)
+
+    val baseRowTypeInfo = new BaseRowTypeInfo(outputType.getFieldTypes, 
outputType.getFieldNames)
+    new ValuesInputFormat(generatedFunction, baseRowTypeInfo)
+  }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
index 059a971..35eab30 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
@@ -18,14 +18,24 @@
 
 package org.apache.flink.table.plan.nodes.physical.batch
 
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{BatchTableEnvironment, TableEnvironment, 
TableException}
+import org.apache.flink.table.codegen.CodeGenUtils
+import org.apache.flink.table.codegen.SinkCodeGenerator._
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
 import org.apache.flink.table.plan.nodes.calcite.Sink
-import org.apache.flink.table.sinks.TableSink
+import org.apache.flink.table.sinks.{BatchTableSink, DataStreamTableSink, 
TableSink}
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 
 import java.util
 
+import scala.collection.JavaConversions._
+
 /**
   * Batch physical RelNode to to write data into an external sink defined by a 
[[TableSink]].
   */
@@ -36,10 +46,70 @@ class BatchExecSink[T](
     sink: TableSink[T],
     sinkName: String)
   extends Sink(cluster, traitSet, inputRel, sink, sinkName)
-  with BatchPhysicalRel {
+  with BatchPhysicalRel
+  with BatchExecNode[Any] {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
     new BatchExecSink(cluster, traitSet, inputs.get(0), sink, sinkName)
   }
 
+  //~ ExecNode methods 
-----------------------------------------------------------
+
+  /**
+    * For sink operator, the records will not pass through it, so it's 
DamBehavior is FULL_DAM.
+    *
+    * @return Returns [[DamBehavior]] of Sink.
+    */
+  override def getDamBehavior: DamBehavior = DamBehavior.FULL_DAM
+
+  override def getInputNodes: util.List[ExecNode[BatchTableEnvironment, _]] = {
+    List(getInput.asInstanceOf[ExecNode[BatchTableEnvironment, _]])
+  }
+
+  override protected def translateToPlanInternal(
+      tableEnv: BatchTableEnvironment): StreamTransformation[Any] = {
+    val resultTransformation = sink match {
+      case batchTableSink: BatchTableSink[T] =>
+        val transformation = translateToStreamTransformation(withChangeFlag = 
false, tableEnv)
+        val boundedStream = new DataStream(tableEnv.streamEnv, transformation)
+        batchTableSink.emitBoundedStream(
+          boundedStream, tableEnv.getConfig, 
tableEnv.streamEnv.getConfig).getTransformation
+
+      case streamTableSink: DataStreamTableSink[T] =>
+        // In case of table to bounded stream through 
BatchTableEnvironment#toBoundedStream, we
+        // insert a DataStreamTableSink then wrap it as a LogicalSink, there 
is no real batch table
+        // sink, so we do not need to invoke TableSink#emitBoundedStream and 
set resource, just a
+        // translation to StreamTransformation is ok.
+        translateToStreamTransformation(withChangeFlag = 
streamTableSink.withChangeFlag, tableEnv)
+
+      case _ =>
+        throw new TableException("Only Support BatchTableSink or 
DataStreamTableSink!")
+    }
+    resultTransformation.asInstanceOf[StreamTransformation[Any]]
+  }
+
+  private def translateToStreamTransformation(
+      withChangeFlag: Boolean,
+      tableEnv: BatchTableEnvironment): StreamTransformation[T] = {
+    val resultType = sink.getOutputType
+    TableEnvironment.validateType(resultType)
+    val inputNode = getInputNodes.get(0)
+    inputNode match {
+      // Sink's input must be BatchExecNode[BaseRow] now.
+      case node: BatchExecNode[BaseRow] =>
+        val plan = node.translateToPlan(tableEnv)
+        // TODO support SinkConversion after FLINK-11974 is done
+        val typeClass = extractTableSinkTypeClass(sink)
+        if (CodeGenUtils.isInternalClass(typeClass, resultType)) {
+          plan.asInstanceOf[StreamTransformation[T]]
+        } else {
+          throw new TableException(s"Not support SinkConvention now.")
+        }
+      case _ =>
+        throw new TableException("Cannot generate BoundedStream due to an 
invalid logical plan. " +
+                                   "This is a bug and should not happen. 
Please file an issue.")
+    }
+  }
+
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
index 00f47827..ad312f2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecValues.scala
@@ -18,13 +18,21 @@
 
 package org.apache.flink.table.plan.nodes.physical.batch
 
-import com.google.common.collect.ImmutableList
+import org.apache.flink.runtime.operators.DamBehavior
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.BatchTableEnvironment
+import org.apache.flink.table.codegen.ValuesCodeGenerator
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
+
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rel.{RelNode, RelWriter}
 import org.apache.calcite.rex.RexLiteral
 
+import com.google.common.collect.ImmutableList
+
 import java.util
 
 import scala.collection.JavaConversions._
@@ -38,7 +46,8 @@ class BatchExecValues(
     tuples: ImmutableList[ImmutableList[RexLiteral]],
     outputRowType: RelDataType)
   extends Values(cluster, outputRowType, tuples, traitSet)
-  with BatchPhysicalRel {
+  with BatchPhysicalRel
+  with BatchExecNode[BaseRow] {
 
   override def deriveRowType(): RelDataType = outputRowType
 
@@ -51,5 +60,23 @@ class BatchExecValues(
       .item("values", getRowType.getFieldNames.toList.mkString(", "))
   }
 
+  //~ ExecNode methods 
-----------------------------------------------------------
+
+  override def getDamBehavior: DamBehavior = DamBehavior.PIPELINED
+
+  override protected def translateToPlanInternal(
+      tableEnv: BatchTableEnvironment): StreamTransformation[BaseRow] = {
+    val inputFormat = ValuesCodeGenerator.generatorInputFormat(
+      tableEnv,
+      getRowType,
+      tuples,
+      getRelTypeName)
+    tableEnv.streamEnv.createInput(inputFormat, 
inputFormat.getProducedType).getTransformation
+  }
+
+  override def getInputNodes: util.List[ExecNode[BatchTableEnvironment, _]] = {
+    new util.ArrayList[ExecNode[BatchTableEnvironment, _]]()
+  }
+
 }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
index 7b74b67..c9adccc 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
@@ -18,7 +18,17 @@
 
 package org.apache.flink.table.plan.nodes.physical.stream
 
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{StreamTableEnvironment, Table, 
TableException}
+import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.codegen.CodeGenUtils
+import 
org.apache.flink.table.codegen.SinkCodeGenerator.extractTableSinkTypeClass
+import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.calcite.Sink
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+import org.apache.flink.table.plan.`trait`.{AccMode, AccModeTraitDef}
+import org.apache.flink.table.plan.util.UpdatingPlanChecker
 import org.apache.flink.table.sinks._
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -26,6 +36,8 @@ import org.apache.calcite.rel.RelNode
 
 import java.util
 
+import scala.collection.JavaConversions._
+
 /**
   * Stream physical RelNode to to write data into an external sink defined by 
a [[TableSink]].
   */
@@ -36,12 +48,13 @@ class StreamExecSink[T](
     sink: TableSink[T],
     sinkName: String)
   extends Sink(cluster, traitSet, inputRel, sink, sinkName)
-  with StreamPhysicalRel {
+  with StreamPhysicalRel
+  with StreamExecNode[Any] {
 
   override def producesUpdates: Boolean = false
 
   override def needsUpdatesAsRetraction(input: RelNode): Boolean =
-    sink.isInstanceOf[BaseRetractStreamTableSink[_]]
+    sink.isInstanceOf[RetractStreamTableSink[_]]
 
   override def consumesRetractions: Boolean = false
 
@@ -53,5 +66,107 @@ class StreamExecSink[T](
     new StreamExecSink(cluster, traitSet, inputs.get(0), sink, sinkName)
   }
 
+  //~ ExecNode methods 
-----------------------------------------------------------
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+    List(getInput.asInstanceOf[ExecNode[StreamTableEnvironment, _]])
+  }
+
+  override protected def translateToPlanInternal(
+      tableEnv: StreamTableEnvironment): StreamTransformation[Any] = {
+    val resultTransformation = sink match {
+      case streamTableSink: StreamTableSink[T] =>
+        val transformation = streamTableSink match {
+          case _: RetractStreamTableSink[T] =>
+            translateToStreamTransformation(withChangeFlag = true, tableEnv)
+
+          case upsertSink: UpsertStreamTableSink[T] =>
+            // check for append only table
+            val isAppendOnlyTable = UpdatingPlanChecker.isAppendOnly(this)
+            upsertSink.setIsAppendOnly(isAppendOnlyTable)
+            translateToStreamTransformation(withChangeFlag = true, tableEnv)
+
+          case _: AppendStreamTableSink[T] =>
+            // verify table is an insert-only (append-only) table
+            if (!UpdatingPlanChecker.isAppendOnly(this)) {
+              throw new TableException(
+                "AppendStreamTableSink requires that Table has only insert 
changes.")
+            }
+
+            val accMode = 
this.getTraitSet.getTrait(AccModeTraitDef.INSTANCE).getAccMode
+            if (accMode == AccMode.AccRetract) {
+              throw new TableException(
+                "AppendStreamTableSink can not be used to output retraction 
messages.")
+            }
+            translateToStreamTransformation(withChangeFlag = false, tableEnv)
+
+          case _ =>
+            throw new TableException(
+              "Stream Tables can only be emitted by AppendStreamTableSink, " +
+                "RetractStreamTableSink, or UpsertStreamTableSink.")
+        }
+        val dataStream = new DataStream(tableEnv.execEnv, transformation)
+        streamTableSink.emitDataStream(dataStream).getTransformation
+
+      case streamTableSink: DataStreamTableSink[_] =>
+        // In case of table to stream through 
BatchTableEnvironment#translateToDataStream,
+        // we insert a DataStreamTableSink then wrap it as a LogicalSink, 
there is no real batch
+        // table sink, so we do not need to invoke TableSink#emitBoundedStream 
and set resource,
+        // just a translation to StreamTransformation is ok.
+        translateToStreamTransformation(streamTableSink.withChangeFlag, 
tableEnv)
+
+      case _ =>
+        throw new TableException("Only Support StreamTableSink or 
DataStreamTableSink!")
+    }
+    resultTransformation.asInstanceOf[StreamTransformation[Any]]
+  }
+
+  /**
+    * Translates a logical [[RelNode]] into a [[StreamTransformation]].
+    *
+    * @param withChangeFlag Set to true to emit records with change flags.
+    * @return The [[StreamTransformation]] that corresponds to the translated 
[[Table]].
+    */
+  private def translateToStreamTransformation(
+      withChangeFlag: Boolean,
+      tableEnv: StreamTableEnvironment): StreamTransformation[T] = {
+    val inputNode = getInput
+    // if no change flags are requested, verify table is an insert-only 
(append-only) table.
+    if (!withChangeFlag && !UpdatingPlanChecker.isAppendOnly(inputNode)) {
+      throw new TableException(
+        "Table is not an append-only table. " +
+          "Use the toRetractStream() in order to handle add and retract 
messages.")
+    }
+
+    // get BaseRow plan
+    val parTransformation = inputNode match {
+      // Sink's input must be StreamExecNode[BaseRow] now.
+      case node: StreamExecNode[BaseRow] =>
+        node.translateToPlan(tableEnv)
+      case _ =>
+        throw new TableException("Cannot generate DataStream due to an invalid 
logical plan. " +
+                                   "This is a bug and should not happen. 
Please file an issue.")
+    }
+    val logicalType = inputNode.getRowType
+    val rowtimeFields = logicalType.getFieldList
+                        .filter(f => 
FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+    if (rowtimeFields.size > 1) {
+      throw new TableException(
+        s"Found more than one rowtime field: 
[${rowtimeFields.map(_.getName).mkString(", ")}] in " +
+          s"the table that should be converted to a DataStream.\n" +
+          s"Please select the rowtime field that should be used as event-time 
timestamp for the " +
+          s"DataStream by casting all other fields to TIMESTAMP.")
+    }
+    val resultType = sink.getOutputType
+    // TODO support SinkConversion after FLINK-11974 is done
+    val typeClass = extractTableSinkTypeClass(sink)
+    if (CodeGenUtils.isInternalClass(typeClass, resultType)) {
+      parTransformation.asInstanceOf[StreamTransformation[T]]
+    } else {
+      throw new TableException(s"Not support SinkConvention now.")
+    }
+
+  }
 
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
index b87080a..4b63f18 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecValues.scala
@@ -18,13 +18,22 @@
 
 package org.apache.flink.table.plan.nodes.physical.stream
 
-import com.google.common.collect.ImmutableList
+import org.apache.flink.streaming.api.transformations.StreamTransformation
+import org.apache.flink.table.api.{StreamTableEnvironment, TableConfigOptions, 
TableException}
+import org.apache.flink.table.codegen.ValuesCodeGenerator
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.plan.nodes.exec.{ExecNode, StreamExecNode}
+
 import org.apache.calcite.plan._
+import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rel.core.Values
 import org.apache.calcite.rex.RexLiteral
 
+import com.google.common.collect.ImmutableList
+
+import java.util
+
 /**
   * Stream physical RelNode for [[Values]].
   */
@@ -34,7 +43,8 @@ class StreamExecValues(
     tuples: ImmutableList[ImmutableList[RexLiteral]],
     outputRowType: RelDataType)
   extends Values(cluster, outputRowType, tuples, traitSet)
-  with StreamPhysicalRel {
+  with StreamPhysicalRel
+  with StreamExecNode[BaseRow] {
 
   override def producesUpdates: Boolean = false
 
@@ -51,4 +61,28 @@ class StreamExecValues(
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
     new StreamExecValues(cluster, traitSet, getTuples, outputRowType)
   }
+
+  //~ ExecNode methods 
-----------------------------------------------------------
+
+  override protected def translateToPlanInternal(
+      tableEnv: StreamTableEnvironment): StreamTransformation[BaseRow] = {
+    if (tableEnv.getConfig.getConf.getBoolean(
+      TableConfigOptions.SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED)) {
+      val inputFormat = ValuesCodeGenerator.generatorInputFormat(
+        tableEnv,
+        getRowType,
+        tuples,
+        getRelTypeName)
+      tableEnv.execEnv.createInput(inputFormat, 
inputFormat.getProducedType).getTransformation
+    } else {
+      // enable this feature when runtime support do checkpoint when source 
finished
+      throw new TableException("Values source input is not supported 
currently. Probably " +
+        "there is a where condition which always returns false in your query.")
+    }
+  }
+
+  override def getInputNodes: util.List[ExecNode[StreamTableEnvironment, _]] = 
{
+    new util.ArrayList[ExecNode[StreamTableEnvironment, _]]()
+  }
+
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
index e54c9d2..97ff92b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/Optimizer.scala
@@ -31,7 +31,7 @@ trait Optimizer {
     * <p>NOTES:
     * <p>1. The reused node in result DAG will be converted to the same 
RelNode.
     * <p>2. If a root node requires retract changes on Stream, the node should 
be
-    * a [[org.apache.flink.table.sinks.BaseRetractStreamTableSink]] or
+    * a [[org.apache.flink.table.sinks.RetractStreamTableSink]] or
     * a regular node with 
[[org.apache.flink.table.plan.trait.UpdateAsRetractionTrait]]
     * which `updateAsRetraction` is true.
     *
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index e2bd710..ac05926 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -209,6 +209,7 @@ object FlinkBatchRuleSets {
     BatchExecScanTableSourceRule.INSTANCE,
     BatchExecValuesRule.INSTANCE,
     BatchExecCalcRule.INSTANCE,
-    BatchExecUnionRule.INSTANCE
+    BatchExecUnionRule.INSTANCE,
+    BatchExecSinkRule.INSTANCE
   )
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index a32c926..e8b96ef 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -215,7 +215,8 @@ object FlinkStreamRuleSets {
     StreamExecTableSourceScanRule.INSTANCE,
     StreamExecValuesRule.INSTANCE,
     StreamExecCalcRule.INSTANCE,
-    StreamExecUnionRule.INSTANCE
+    StreamExecUnionRule.INSTANCE,
+    StreamExecSinkRule.INSTANCE
   )
 
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
new file mode 100644
index 0000000..93aaeb6
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/batch/BatchExecSinkRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.batch
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+import org.apache.flink.table.plan.nodes.physical.batch.BatchExecSink
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.RelNode
+
+class BatchExecSinkRule extends ConverterRule(
+    classOf[FlinkLogicalSink],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.BATCH_PHYSICAL,
+    "BatchExecSinkRule") {
+
+  def convert(rel: RelNode): RelNode = {
+    val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
+    val newTrait = rel.getTraitSet.replace(FlinkConventions.BATCH_PHYSICAL)
+    // TODO Take PartitionableSink into consideration after FLINK-11993 is done
+    val newInput = RelOptRule.convert(sinkNode.getInput, 
FlinkConventions.BATCH_PHYSICAL)
+
+    new BatchExecSink(
+      rel.getCluster,
+      newTrait,
+      newInput,
+      sinkNode.sink,
+      sinkNode.sinkName)
+  }
+}
+
+object BatchExecSinkRule {
+
+  val INSTANCE: RelOptRule = new BatchExecSinkRule
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
new file mode 100644
index 0000000..5ae6ead
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/physical/stream/StreamExecSinkRule.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.physical.stream
+
+import org.apache.flink.table.plan.nodes.FlinkConventions
+import org.apache.flink.table.plan.nodes.logical.FlinkLogicalSink
+import org.apache.flink.table.plan.nodes.physical.stream.StreamExecSink
+
+import org.apache.calcite.plan.RelOptRule
+import org.apache.calcite.rel.convert.ConverterRule
+import org.apache.calcite.rel.RelNode
+
+class StreamExecSinkRule extends ConverterRule(
+    classOf[FlinkLogicalSink],
+    FlinkConventions.LOGICAL,
+    FlinkConventions.STREAM_PHYSICAL,
+    "StreamExecSinkRule") {
+
+  def convert(rel: RelNode): RelNode = {
+    val sinkNode = rel.asInstanceOf[FlinkLogicalSink]
+    val newTrait = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
+    // TODO Take PartitionableSink into consideration after FLINK-11993 is done
+    val newInput = RelOptRule.convert(sinkNode.getInput, 
FlinkConventions.STREAM_PHYSICAL)
+
+    new StreamExecSink(
+      rel.getCluster,
+      newTrait,
+      newInput,
+      sinkNode.sink,
+      sinkNode.sinkName)
+  }
+}
+
+object StreamExecSinkRule {
+
+  val INSTANCE: RelOptRule = new StreamExecSinkRule
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
similarity index 67%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
index 82b9590..d0c8807 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/AppendStreamTableSink.scala
@@ -18,17 +18,15 @@
 
 package org.apache.flink.table.sinks
 
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.Table
 
 /**
-  * Defines an external [[TableSink]] to emit a streaming [[Table]] with 
insert, update, and delete
-  * changes.
+  * Defines an external [[TableSink]] to emit streaming [[Table]] with only 
insert changes.
   *
-  * @tparam T Type of records that this [[TableSink]] expects and supports.
+  * If the [[Table]] is also modified by update or delete changes, a
+  * [[org.apache.flink.table.api.TableException]] will be thrown.
+  *
+  * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and 
supports.
   */
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
-
-  /** Emits the DataStream. */
-  def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
-}
+trait AppendStreamTableSink[T] extends StreamTableSink[T]
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
similarity index 69%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
index 82b9590..8ab44b4 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
@@ -18,17 +18,19 @@
 
 package org.apache.flink.table.sinks
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.table.api.Table
+import org.apache.flink.table.api._
 
-/**
-  * Defines an external [[TableSink]] to emit a streaming [[Table]] with 
insert, update, and delete
-  * changes.
+/** Defines an external [[TableSink]] to emit a batch [[Table]].
   *
-  * @tparam T Type of records that this [[TableSink]] expects and supports.
+  * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and 
supports.
   */
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
+trait BatchTableSink[T] extends TableSink[T] {
 
   /** Emits the DataStream. */
-  def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
+  def emitBoundedStream(
+      boundedStream: DataStream[T],
+      tableConfig: TableConfig,
+      executionConfig: ExecutionConfig): DataStreamSink[_]
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
new file mode 100644
index 0000000..27a7d49
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator
+import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
+import org.apache.flink.table.api._
+import org.apache.flink.types.Row
+
+/**
+  * A simple [[TableSink]] to emit data as T to a collection.
+  */
+class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] => 
TypeInformation[T]))
+  extends TableSinkBase[T] with BatchTableSink[T] {
+
+  private var collectOutputFormat: CollectOutputFormat[T] = _
+
+  override def emitBoundedStream(
+      boundedStream: DataStream[T],
+      tableConfig: TableConfig,
+      executionConfig: ExecutionConfig): DataStreamSink[T] = {
+    boundedStream.writeUsingOutputFormat(collectOutputFormat)
+    .name("collect")
+  }
+
+  override protected def copy: TableSinkBase[T] = {
+    new CollectTableSink(produceOutputType)
+  }
+
+  override def getOutputType: TypeInformation[T] = {
+    produceOutputType(getFieldTypes)
+  }
+
+  def init(typeSerializer: TypeSerializer[T], id: String): Unit = {
+    collectOutputFormat = new CollectOutputFormat(id, typeSerializer)
+  }
+}
+
+class CollectOutputFormat[T](id: String, typeSerializer: TypeSerializer[T])
+  extends RichOutputFormat[T] {
+
+  private var accumulator: SerializedListAccumulator[T] = _
+
+  override def writeRecord(record: T): Unit = {
+    accumulator.add(record, typeSerializer)
+  }
+
+  override def configure(parameters: Configuration): Unit = {
+  }
+
+  override def close(): Unit = {
+    // Important: should only be added in close method to minimize traffic of 
accumulators
+    getRuntimeContext.addAccumulator(id, accumulator)
+  }
+
+  override def open(taskNumber: Int, numTasks: Int): Unit = {
+    this.accumulator = new SerializedListAccumulator[T]
+  }
+}
+
+class CollectRowTableSink extends CollectTableSink[Row](new RowTypeInfo(_: _*))
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
new file mode 100644
index 0000000..c4a308d
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/DataStreamTableSink.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.annotation.Internal
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.{Table, TableException}
+
+/**
+  * A [[DataStreamTableSink]] specifies how to emit a [[Table]] to an 
DataStream[T]
+  *
+  * @param table      The [[Table]] to emit.
+  * @param outputType The [[TypeInformation]] that specifies the type of the 
[[DataStream]].
+  * @param updatesAsRetraction Set to true to encode updates as retraction 
messages.
+  * @param withChangeFlag Set to true to emit records with change flags.
+  * @tparam T The type of the resulting [[DataStream]].
+  */
+@Internal
+class DataStreamTableSink[T](
+    table: Table,
+    outputType: TypeInformation[T],
+    val updatesAsRetraction: Boolean,
+    val withChangeFlag: Boolean) extends TableSink[T] {
+
+  private lazy val tableSchema = table.getSchema
+
+  /**
+    * Return the type expected by this [[TableSink]].
+    *
+    * This type should depend on the types returned by [[getFieldNames]].
+    *
+    * @return The type expected by this [[TableSink]].
+    */
+  override def getOutputType: TypeInformation[T] = outputType
+
+  /** Returns the types of the table fields. */
+  override def getFieldTypes: Array[TypeInformation[_]] = 
Array(tableSchema.getFieldTypes: _*)
+
+  /** Returns the names of the table fields. */
+  override def getFieldNames: Array[String] = tableSchema.getFieldNames
+
+  override def configure(
+      fieldNames: Array[String],
+      fieldTypes: Array[TypeInformation[_]]): TableSink[T] = {
+    throw new TableException(s"configure is not supported.")
+  }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
similarity index 67%
copy from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
copy to 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
index 82b9590..2ee0449 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
@@ -18,8 +18,12 @@
 
 package org.apache.flink.table.sinks
 
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.table.api.Table
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.{Table, Types}
+
+import java.lang.{Boolean => JBool}
 
 /**
   * Defines an external [[TableSink]] to emit a streaming [[Table]] with 
insert, update, and delete
@@ -27,8 +31,11 @@ import org.apache.flink.table.api.Table
   *
   * @tparam T Type of records that this [[TableSink]] expects and supports.
   */
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
+trait RetractStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] {
+
+  /** Returns the requested record type */
+  def getRecordType: TypeInformation[T]
+
+  override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
 
-  /** Emits the DataStream. */
-  def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
new file mode 100644
index 0000000..330faac
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sinks
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.table.api.{Table, Types}
+
+import java.lang.{Boolean => JBool}
+
+
+/**
+  * Defines an external [[TableSink]] to emit a streaming [[Table]] with 
insert, update, and delete
+  * changes. The [[Table]] must be have unique key fields (atomic or 
composite) or be append-only.
+  *
+  * If the [[Table]] does not have a unique key and is not append-only, a
+  * [[org.apache.flink.table.api.TableException]] will be thrown.
+  *
+  * The unique key of the table is configured by the 
[[UpsertStreamTableSink#setKeyFields()]]
+  * method.
+  *
+  * If the table is append-only, all messages will have a true flag and must 
be interpreted
+  * as insertions.
+  *
+  * @tparam T Type of records that this [[TableSink]] expects and supports.
+  */
+trait UpsertStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] {
+
+  /**
+    * Configures the unique key fields of the [[Table]] to write.
+    * The method is called after [[TableSink.configure()]].
+    *
+    * The keys array might be empty, if the table consists of a single 
(updated) record.
+    * If the table does not have a key and is append-only, the keys attribute 
is null.
+    *
+    * @param keys the field names of the table's keys, an empty array if the 
table has a single
+    *             row, and null if the table is append-only and has no key.
+    */
+  def setKeyFields(keys: Array[String]): Unit
+
+  /**
+    * Specifies whether the [[Table]] to write is append-only or not.
+    *
+    * @param isAppendOnly true if the table is append-only, false otherwise.
+    */
+  def setIsAppendOnly(isAppendOnly: JBool): Unit
+
+  /** Returns the requested record type */
+  def getRecordType: TypeInformation[T]
+
+  override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowUtil.java
 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowUtil.java
new file mode 100644
index 0000000..457e13f
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/util/BaseRowUtil.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.util;
+
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.dataformat.GenericRow;
+import org.apache.flink.table.dataformat.TypeGetterSetters;
+import org.apache.flink.table.type.InternalType;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+import org.apache.flink.util.StringUtils;
+
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.TimeZone;
+
+/**
+ * Utility for BaseRow.
+ */
+public class BaseRowUtil {
+
+       public static String baseRowToString(BaseRow value, BaseRowTypeInfo 
rowTypeInfo, TimeZone tz) {
+               return baseRowToString(value, rowTypeInfo, tz, true);
+       }
+
+       public static String baseRowToString(BaseRow value, BaseRowTypeInfo 
rowTypeInfo, TimeZone tz, boolean withHeader) {
+               GenericRow genericRow = toGenericRow(value, rowTypeInfo);
+               return genericRowToString(genericRow, tz, withHeader);
+       }
+
+       private static String fieldToString(Object field, TimeZone tz) {
+               if (field instanceof Date || field instanceof Time || field 
instanceof Timestamp) {
+                       // TODO support after FLINK-11898 is merged
+                       throw new UnsupportedOperationException();
+               } else {
+                       return StringUtils.arrayAwareToString(field);
+               }
+       }
+
+       private static String genericRowToString(GenericRow row, TimeZone tz, 
boolean withHeader) {
+               StringBuilder sb = new StringBuilder();
+               if (withHeader) {
+                       sb.append(row.getHeader()).append("|");
+               }
+               for (int i = 0; i < row.getArity(); i++) {
+                       if (i > 0) {
+                               sb.append(",");
+                       }
+                       sb.append(fieldToString(row.getField(i), tz));
+               }
+               return sb.toString();
+       }
+
+       private static GenericRow toGenericRow(BaseRow baseRow, BaseRowTypeInfo 
baseRowTypeInfo) {
+               if (baseRow instanceof GenericRow) {
+                       return (GenericRow) baseRow;
+               } else {
+                       int fieldNum = baseRow.getArity();
+                       GenericRow row = new GenericRow(fieldNum);
+                       row.setHeader(baseRow.getHeader());
+                       InternalType[] internalTypes = 
baseRowTypeInfo.getInternalTypes();
+                       for (int i = 0; i < fieldNum; i++) {
+                               if (baseRow.isNullAt(i)) {
+                                       row.setField(i, null);
+                               } else {
+                                       row.setField(i, 
TypeGetterSetters.get(baseRow, i, internalTypes[i]));
+                               }
+                       }
+                       return row;
+               }
+       }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/ValuesITCase.scala
similarity index 61%
rename from 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
rename to 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/ValuesITCase.scala
index 82b9590..19f3d55 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BaseRetractStreamTableSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/ValuesITCase.scala
@@ -16,19 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.sinks
+package org.apache.flink.table.runtime.batch.sql
 
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.table.api.Table
+import org.apache.flink.table.runtime.utils.BatchTestBase
 
-/**
-  * Defines an external [[TableSink]] to emit a streaming [[Table]] with 
insert, update, and delete
-  * changes.
-  *
-  * @tparam T Type of records that this [[TableSink]] expects and supports.
-  */
-trait BaseRetractStreamTableSink[T] extends StreamTableSink[T] {
+import org.junit.Assert._
+import org.junit.Test
+
+class ValuesITCase extends BatchTestBase {
+
+  @Test
+  def testValues(): Unit = {
+    val sqlQuery = "SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)"
+    val table = tEnv.sqlQuery(sqlQuery)
+    val actual = collectResults(table)
+    val expected = List("1,2,3")
+    assertEquals(expected.sorted, actual.sorted)
+  }
 
-  /** Emits the DataStream. */
-  def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/ValuesITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/ValuesITCase.scala
new file mode 100644
index 0000000..d081cb5
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/ValuesITCase.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableConfigOptions
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.runtime.utils.{StreamingTestBase, 
TestingAppendBaseRowSink}
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.`type`.InternalTypes.INT
+
+import org.junit.Assert._
+import org.junit.Test
+
+class ValuesITCase extends StreamingTestBase {
+
+  @Test
+  def testValues(): Unit = {
+    
tEnv.getConfig.getConf.setBoolean(TableConfigOptions.SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED,
 true)
+
+    val sqlQuery = "SELECT * FROM (VALUES (1, 2, 3)) T(a, b, c)"
+
+    val outputType = new BaseRowTypeInfo(INT, INT, INT)
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[BaseRow]
+    val sink = new TestingAppendBaseRowSink(outputType)
+    result.addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = List("0|1,2,3")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
new file mode 100644
index 0000000..0ef3c86
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/BatchTestBase.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.accumulators.SerializedListAccumulator
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api._
+import org.apache.flink.table.api.scala.{BatchTableEnvironment => 
ScalaBatchTableEnv}
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.sinks.{CollectTableSink, TableSinkBase}
+import 
org.apache.flink.table.`type`.TypeConverters.createInternalTypeFromTypeInfo
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.util.BaseRowUtil
+import org.apache.flink.util.AbstractID
+
+import _root_.java.util.{TimeZone, ArrayList => JArrayList}
+
+import _root_.scala.collection.JavaConversions._
+
+import org.junit.Before
+
+class BatchTestBase {
+
+  var env: StreamExecutionEnvironment = _
+  val conf: TableConfig = new TableConfig
+
+  // scala tableEnv
+  var tEnv: ScalaBatchTableEnv = _
+
+  @Before
+  def before(): Unit = {
+    // java env
+    val javaEnv = new LocalStreamEnvironment()
+    // scala env
+    this.env = new StreamExecutionEnvironment(javaEnv)
+    this.tEnv = ScalaBatchTableEnv.create(env)
+  }
+
+  def collectResults(table: Table): Seq[String] = {
+    val tableSchema = table.getSchema
+    val sink = new CollectBaseRowTableSink
+    val configuredSink = sink.configure(tableSchema.getFieldNames, 
tableSchema.getFieldTypes)
+                         .asInstanceOf[CollectBaseRowTableSink]
+    val outType = configuredSink.getOutputType.asInstanceOf[BaseRowTypeInfo]
+    val typeSerializer = outType.createSerializer(new ExecutionConfig)
+    val id = new AbstractID().toString
+    configuredSink.init(typeSerializer, id)
+    tEnv.writeToSink(table, configuredSink, "test")
+    val res = env.execute()
+
+    val accResult: JArrayList[Array[Byte]] = res.getAccumulatorResult(id)
+    val datas: Seq[BaseRow] = 
SerializedListAccumulator.deserializeList(accResult, typeSerializer)
+    val tz = TimeZone.getTimeZone("UTC")
+    datas.map(BaseRowUtil.baseRowToString(_, outType, tz, false))
+  }
+
+}
+
+class CollectBaseRowTableSink
+  extends CollectTableSink[BaseRow](
+    types => new BaseRowTypeInfo(types.map(createInternalTypeFromTypeInfo): 
_*)) {
+
+  override protected def copy: TableSinkBase[BaseRow] = {
+    new CollectBaseRowTableSink
+  }
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
new file mode 100644
index 0000000..3c5bdda
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, 
FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.dataformat.BaseRow
+import org.apache.flink.table.typeutils.BaseRowTypeInfo
+import org.apache.flink.table.util.BaseRowUtil
+
+import _root_.java.util.TimeZone
+import _root_.java.util.concurrent.atomic.AtomicInteger
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.mutable
+import _root_.scala.collection.mutable.ArrayBuffer
+
+object StreamTestSink {
+
+  TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
+
+  private[utils] val idCounter: AtomicInteger = new AtomicInteger(0)
+
+  private[utils] val globalResults =
+    mutable.HashMap.empty[Int, mutable.Map[Int, ArrayBuffer[String]]]
+  private[utils] val globalRetractResults =
+    mutable.HashMap.empty[Int, mutable.Map[Int, ArrayBuffer[String]]]
+  private[utils] val globalUpsertResults =
+    mutable.HashMap.empty[Int, mutable.Map[Int, mutable.Map[String, String]]]
+
+  private[utils] def getNewSinkId: Int = {
+    val idx = idCounter.getAndIncrement()
+    this.synchronized{
+      globalResults.put(idx, mutable.HashMap.empty[Int, ArrayBuffer[String]])
+      globalRetractResults.put(idx, mutable.HashMap.empty[Int, 
ArrayBuffer[String]])
+      globalUpsertResults.put(idx, mutable.HashMap.empty[Int, 
mutable.Map[String, String]])
+    }
+    idx
+  }
+
+  def clear(): Unit = {
+    globalResults.clear()
+    globalRetractResults.clear()
+    globalUpsertResults.clear()
+  }
+}
+
+abstract class AbstractExactlyOnceSink[T] extends RichSinkFunction[T] with 
CheckpointedFunction {
+  protected var resultsState: ListState[String] = _
+  protected var localResults: ArrayBuffer[String] = _
+  protected val idx: Int = StreamTestSink.getNewSinkId
+
+  protected var globalResults: mutable.Map[Int, ArrayBuffer[String]]= _
+  protected var globalRetractResults: mutable.Map[Int, ArrayBuffer[String]] = _
+  protected var globalUpsertResults: mutable.Map[Int, mutable.Map[String, 
String]] = _
+
+  override def initializeState(context: FunctionInitializationContext): Unit = 
{
+    resultsState = context.getOperatorStateStore
+      .getListState(new ListStateDescriptor[String]("sink-results", 
Types.STRING))
+
+    localResults = mutable.ArrayBuffer.empty[String]
+
+    if (context.isRestored) {
+      for (value <- resultsState.get().asScala) {
+        localResults += value
+      }
+    }
+
+    val taskId = getRuntimeContext.getIndexOfThisSubtask
+    StreamTestSink.synchronized(
+      StreamTestSink.globalResults(idx) += (taskId -> localResults)
+    )
+  }
+
+  override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    resultsState.clear()
+    for (value <- localResults) {
+      resultsState.add(value)
+    }
+  }
+
+  protected def clearAndStashGlobalResults(): Unit = {
+    if (globalResults == null) {
+      StreamTestSink.synchronized{
+        globalResults = StreamTestSink.globalResults.remove(idx).get
+        globalRetractResults = 
StreamTestSink.globalRetractResults.remove(idx).get
+        globalUpsertResults = 
StreamTestSink.globalUpsertResults.remove(idx).get
+      }
+    }
+  }
+
+  protected def getResults: List[String] = {
+    clearAndStashGlobalResults()
+    val result = ArrayBuffer.empty[String]
+    this.globalResults.foreach {
+      case (_, list) => result ++= list
+    }
+    result.toList
+  }
+}
+
+final class TestingAppendBaseRowSink(
+    rowTypeInfo: BaseRowTypeInfo, tz: TimeZone)
+  extends AbstractExactlyOnceSink[BaseRow] {
+
+  def this(rowTypeInfo: BaseRowTypeInfo) {
+    this(rowTypeInfo, TimeZone.getTimeZone("UTC"))
+  }
+
+  def invoke(value: BaseRow): Unit = localResults +=
+    BaseRowUtil.baseRowToString(value, rowTypeInfo, tz)
+
+  def getAppendResults: List[String] = getResults
+
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
new file mode 100644
index 0000000..d3c30b7
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamingTestBase.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala.StreamTableEnvironment
+
+import org.junit.rules.{ExpectedException, TemporaryFolder}
+import org.junit.{Before, Rule}
+
+class StreamingTestBase {
+
+  var env: StreamExecutionEnvironment = _
+  var tEnv: StreamTableEnvironment = _
+  val _tempFolder = new TemporaryFolder
+  var enableObjectReuse = true
+  // used for accurate exception information checking.
+  val expectedException = ExpectedException.none()
+
+  @Rule
+  def thrown: ExpectedException = expectedException
+
+  @Rule
+  def tempFolder: TemporaryFolder = _tempFolder
+
+  @Before
+  def before(): Unit = {
+    StreamTestSink.clear()
+    this.env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(4)
+    this.env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    if (enableObjectReuse) {
+      this.env.getConfig.enableObjectReuse()
+    }
+    this.tEnv = StreamTableEnvironment.create(env)
+  }
+
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
index 0678e71..5c5909f 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/api/TableConfigOptions.java
@@ -28,6 +28,16 @@ import static 
org.apache.flink.configuration.ConfigOptions.key;
 public class TableConfigOptions {
 
        // 
------------------------------------------------------------------------
+       //  Source Options
+       // 
------------------------------------------------------------------------
+
+       public static final ConfigOption<Boolean> 
SQL_EXEC_SOURCE_VALUES_INPUT_ENABLED =
+                       key("sql.exec.source.values-input.enabled")
+                                       .defaultValue(false)
+                                       .withDescription("Whether support 
values source input. The reason for disabling this " +
+                                                                       
"feature is that checkpoint will not work properly when source finished.");
+
+       // 
------------------------------------------------------------------------
        //  Sort Options
        // 
------------------------------------------------------------------------
 
@@ -74,4 +84,13 @@ public class TableConfigOptions {
                                        .defaultValue(Long.MIN_VALUE)
                                        .withDescription("MiniBatch allow 
latency(ms). Value > 0 means MiniBatch enabled.");
 
+       // 
------------------------------------------------------------------------
+       //  STATE BACKEND Options
+       // 
------------------------------------------------------------------------
+
+       public static final ConfigOption<Boolean> 
SQL_EXEC_STATE_BACKEND_ON_HEAP =
+                       key("sql.exec.statebackend.onheap")
+                                       .defaultValue(false)
+                                       .withDescription("Whether the 
statebackend is on heap.");
+
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
index f0484f7..4154fe7 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/dataformat/GenericRow.java
@@ -117,6 +117,10 @@ public final class GenericRow extends ObjectArrayRow {
                this.fields[ordinal] = value;
        }
 
+       public Object getField(int ordinal) {
+               return this.fields[ordinal];
+       }
+
        public static GenericRow of(Object... values) {
                GenericRow row = new GenericRow(values.length);
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
index 043340b..736fbcd 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
@@ -66,4 +66,16 @@ public abstract class GeneratedClass<T> implements 
Serializable {
                }
                return compiledClass;
        }
+
+       public String getClassName() {
+               return className;
+       }
+
+       public String getCode() {
+               return code;
+       }
+
+       public Object[] getReferences() {
+               return references;
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/values/ValuesInputFormat.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/values/ValuesInputFormat.java
new file mode 100644
index 0000000..45ddd4c
--- /dev/null
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/values/ValuesInputFormat.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.values;
+
+import org.apache.flink.api.common.io.GenericInputFormat;
+import org.apache.flink.api.common.io.NonParallelInput;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.generated.GeneratedInput;
+import org.apache.flink.table.typeutils.BaseRowTypeInfo;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Generated ValuesInputFormat.
+ */
+public class ValuesInputFormat
+               extends GenericInputFormat<BaseRow>
+               implements NonParallelInput, ResultTypeQueryable<BaseRow> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ValuesInputFormat.class);
+       private GeneratedInput<GenericInputFormat<BaseRow>> generatedInput;
+       private BaseRowTypeInfo returnType;
+       private GenericInputFormat<BaseRow> format;
+
+       public ValuesInputFormat(GeneratedInput<GenericInputFormat<BaseRow>> 
generatedInput, BaseRowTypeInfo returnType) {
+               this.generatedInput = generatedInput;
+               this.returnType = returnType;
+       }
+
+       @Override
+       public void open(GenericInputSplit split) {
+               LOG.debug("Compiling GenericInputFormat: $name \n\n 
Code:\n$code",
+                               generatedInput.getClassName(), 
generatedInput.getCode());
+               LOG.debug("Instantiating GenericInputFormat.");
+               format = 
generatedInput.newInstance(getRuntimeContext().getUserCodeClassLoader());
+       }
+
+       @Override
+       public boolean reachedEnd() throws IOException {
+               return format.reachedEnd();
+       }
+
+       @Override
+       public BaseRow nextRecord(BaseRow reuse) throws IOException {
+               return format.nextRecord(reuse);
+       }
+
+       @Override
+       public TypeInformation<BaseRow> getProducedType() {
+               return returnType;
+       }
+
+}
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
index 11e57f8..8778767 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/typeutils/BaseRowTypeInfo.java
@@ -142,4 +142,9 @@ public class BaseRowTypeInfo extends 
TupleTypeInfoBase<BaseRow> {
        public BaseRowSerializer createSerializer(ExecutionConfig config) {
                return new BaseRowSerializer(config, internalTypes);
        }
+
+       public InternalType[] getInternalTypes() {
+               return internalTypes;
+       }
+
 }

Reply via email to