This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 851d27bb54596d1044a831160b09aad9f3b32316
Author: Jark Wu <[email protected]>
AuthorDate: Mon Jun 10 20:26:04 2019 +0800

    [FLINK-12708][table] Introduce OutputFormatTableSink and make blink&flink 
planner support it
    
    1. Introduce OutputFormatTableSink and extends StreamTableSink and only 
expose getOutputFormat interface
    2. Add a method consumeDataStream(DataStream) to StreamTableSink which 
returns DataStreamSink always, and make it mandatory in Blink planner
---
 .../apache/flink/table/sinks/BatchTableSink.java   |  3 ++
 ...amTableSink.java => OutputFormatTableSink.java} | 27 +++++++++++++---
 .../apache/flink/table/sinks/StreamTableSink.java  | 21 ++++++++++++-
 .../plan/nodes/physical/batch/BatchExecSink.scala  | 29 +++++++++++------
 .../nodes/physical/stream/StreamExecSink.scala     | 15 ++++++---
 .../apache/flink/table/sinks/BatchTableSink.scala  | 36 ----------------------
 .../flink/table/sinks/CollectTableSink.scala       | 19 ++++++------
 .../flink/table/sinks/RetractStreamTableSink.scala | 20 ++++++++++--
 .../apache/flink/table/sinks/StreamTableSink.scala | 33 --------------------
 .../flink/table/sinks/UpsertStreamTableSink.scala  | 18 ++++++++---
 .../flink/table/runtime/utils/StreamTestSink.scala | 31 +++++++++++--------
 .../apache/flink/table/api/BatchTableEnvImpl.scala | 24 +++++++++++----
 .../flink/table/api/StreamTableEnvImpl.scala       |  6 ++--
 .../runtime/batch/table/TableSinkITCase.scala      | 33 +++++++++++++++++++-
 .../table/utils/MemoryTableSourceSinkUtil.scala    | 30 ++++++++++++++++--
 15 files changed, 213 insertions(+), 132 deletions(-)

diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java
index cbf7845..f9413b7 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java
@@ -24,7 +24,10 @@ import org.apache.flink.table.api.Table;
 /** Defines an external {@link TableSink} to emit a batch {@link Table}.
  *
  * @param <T> Type of {@link DataSet} that this {@link TableSink} expects and 
supports.
+ *
+ * @deprecated use {@link OutputFormatTableSink} instead.
  */
+@Deprecated
 public interface BatchTableSink<T> extends TableSink<T> {
 
        /** Emits the DataSet. */
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
similarity index 51%
copy from 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java
copy to 
flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
index e3131f3..5ab2606 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java
@@ -18,15 +18,32 @@
 
 package org.apache.flink.table.sinks;
 
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.Table;
 
 /**
- * Defines an external stream table and provides write access to its data.
+ * Defines an external {@link TableSink} to emit a bounded {@link Table}.
  *
- * @param <T> Type of the {@link DataStream} created by this {@link TableSink}.
+ * @param <T> Type of the bounded {@link OutputFormat} that this {@link 
TableSink} expects and supports.
  */
-public interface StreamTableSink<T> extends TableSink<T> {
+@Experimental
+public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> {
 
-       /** Emits the DataStream. */
-       void emitDataStream(DataStream<T> dataStream);
+       /**
+        * Returns an {@link OutputFormat} for writing the data of the table.
+        */
+       public abstract OutputFormat<T> getOutputFormat();
+
+       @Override
+       public final void emitDataStream(DataStream<T> dataStream) {
+               consumeDataStream(dataStream);
+       }
+
+       @Override
+       public final DataStreamSink<T> consumeDataStream(DataStream<T> 
dataStream) {
+               return dataStream.writeUsingOutputFormat(getOutputFormat());
+       }
 }
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java
index e3131f3..825d5bb 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.sinks;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 
 /**
  * Defines an external stream table and provides write access to its data.
@@ -27,6 +28,24 @@ import org.apache.flink.streaming.api.datastream.DataStream;
  */
 public interface StreamTableSink<T> extends TableSink<T> {
 
-       /** Emits the DataStream. */
+       /**
+        * Emits the DataStream.
+        *
+        * @deprecated This method will be removed in future versions as it 
returns nothing.
+        *  It is recommended to use {@link #consumeDataStream(DataStream)} 
instead which
+        *  returns the {@link DataStreamSink}. The returned {@link 
DataStreamSink} will be
+        *  used to set resources for the sink operator. If the {@link 
#consumeDataStream(DataStream)}
+        *  is implemented, this method can be empty implementation.
+        */
+       @Deprecated
        void emitDataStream(DataStream<T> dataStream);
+
+       /**
+        * Consumes the DataStream and return the sink transformation {@link 
DataStreamSink}.
+        * The returned {@link DataStreamSink} will be used to set resources 
for the sink operator.
+        */
+       default DataStreamSink<?> consumeDataStream(DataStream<T> dataStream) {
+               emitDataStream(dataStream);
+               return null;
+       }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
index f7565a7..376630c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.plan.nodes.physical.batch
 
+import java.util
 import org.apache.flink.runtime.operators.DamBehavior
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.transformations.{OneInputTransformation, 
StreamTransformation}
@@ -28,15 +29,12 @@ import org.apache.flink.table.dataformat.BaseRow
 import org.apache.flink.table.plan.nodes.calcite.Sink
 import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode}
 import 
org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig
-import org.apache.flink.table.sinks.{BatchTableSink, DataStreamTableSink, 
TableSink}
+import org.apache.flink.table.sinks.{DataStreamTableSink, 
RetractStreamTableSink, StreamTableSink, TableSink, UpsertStreamTableSink}
 import 
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
 import org.apache.flink.table.typeutils.BaseRowTypeInfo
-
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 
-import java.util
-
 import scala.collection.JavaConversions._
 
 /**
@@ -78,11 +76,21 @@ class BatchExecSink[T](
   override protected def translateToPlanInternal(
       tableEnv: BatchTableEnvironment): StreamTransformation[Any] = {
     val resultTransformation = sink match {
-      case batchTableSink: BatchTableSink[T] =>
+      case _: RetractStreamTableSink[T] | _: UpsertStreamTableSink[T] =>
+        throw new TableException("RetractStreamTableSink and 
UpsertStreamTableSink is not" +
+          " supported in Batch environment.")
+
+      case streamTableSink: StreamTableSink[T] =>
+        // we can insert the bounded DataStream into a StreamTableSink
         val transformation = translateToStreamTransformation(withChangeFlag = 
false, tableEnv)
         val boundedStream = new DataStream(tableEnv.streamEnv, transformation)
-        val sinkTransformation = batchTableSink.emitBoundedStream(
-          boundedStream, tableEnv.getConfig, 
tableEnv.streamEnv.getConfig).getTransformation
+        val dsSink = streamTableSink.consumeDataStream(boundedStream)
+        if (dsSink == null) {
+          throw new TableException("The 
StreamTableSink#consumeDataStream(DataStream) must be " +
+            "implemented and return the sink transformation DataStreamSink. " +
+            s"However, ${sink.getClass.getCanonicalName} doesn't implement 
this method.")
+        }
+        val sinkTransformation = dsSink.getTransformation
 
         if (sinkTransformation.getMaxParallelism > 0) {
           
sinkTransformation.setParallelism(sinkTransformation.getMaxParallelism)
@@ -97,15 +105,16 @@ class BatchExecSink[T](
         }
         sinkTransformation
 
-      case streamTableSink: DataStreamTableSink[T] =>
+      case dsTableSink: DataStreamTableSink[T] =>
         // In case of table to bounded stream through 
BatchTableEnvironment#toBoundedStream, we
         // insert a DataStreamTableSink then wrap it as a LogicalSink, there 
is no real batch table
         // sink, so we do not need to invoke TableSink#emitBoundedStream and 
set resource, just a
         // translation to StreamTransformation is ok.
-        translateToStreamTransformation(withChangeFlag = 
streamTableSink.withChangeFlag, tableEnv)
+        translateToStreamTransformation(withChangeFlag = 
dsTableSink.withChangeFlag, tableEnv)
 
       case _ =>
-        throw new TableException("Only Support BatchTableSink or 
DataStreamTableSink!")
+        throw new TableException(s"Only Support StreamTableSink! " +
+          s"However ${sink.getClass.getCanonicalName} is not a 
StreamTableSink.")
     }
     resultTransformation.asInstanceOf[StreamTransformation[Any]]
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
index 9ec0f6e..0bab82b 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala
@@ -115,17 +115,24 @@ class StreamExecSink[T](
                 "RetractStreamTableSink, or UpsertStreamTableSink.")
         }
         val dataStream = new DataStream(tableEnv.execEnv, transformation)
-        streamTableSink.emitDataStream(dataStream).getTransformation
+        val dsSink = streamTableSink.consumeDataStream(dataStream)
+        if (dsSink == null) {
+          throw new TableException("The 
StreamTableSink#consumeDataStream(DataStream) must be " +
+            "implemented and return the sink transformation DataStreamSink. " +
+            s"However, ${sink.getClass.getCanonicalName} doesn't implement 
this method.")
+        }
+        dsSink.getTransformation
 
-      case streamTableSink: DataStreamTableSink[_] =>
+      case dsTableSink: DataStreamTableSink[_] =>
         // In case of table to stream through 
BatchTableEnvironment#translateToDataStream,
         // we insert a DataStreamTableSink then wrap it as a LogicalSink, 
there is no real batch
         // table sink, so we do not need to invoke TableSink#emitBoundedStream 
and set resource,
         // just a translation to StreamTransformation is ok.
-        translateToStreamTransformation(streamTableSink.withChangeFlag, 
tableEnv)
+        translateToStreamTransformation(dsTableSink.withChangeFlag, tableEnv)
 
       case _ =>
-        throw new TableException("Only Support StreamTableSink or 
DataStreamTableSink!")
+        throw new TableException(s"Only Support StreamTableSink! " +
+          s"However ${sink.getClass.getCanonicalName} is not a 
StreamTableSink.")
     }
     resultTransformation.asInstanceOf[StreamTransformation[Any]]
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
deleted file mode 100644
index 8ab44b4..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.table.api._
-
-/** Defines an external [[TableSink]] to emit a batch [[Table]].
-  *
-  * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and 
supports.
-  */
-trait BatchTableSink[T] extends TableSink[T] {
-
-  /** Emits the DataStream. */
-  def emitBoundedStream(
-      boundedStream: DataStream[T],
-      tableConfig: TableConfig,
-      executionConfig: ExecutionConfig): DataStreamSink[_]
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
index 8c50f57..40e3a02 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.sinks
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator
 import org.apache.flink.api.common.io.RichOutputFormat
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -26,24 +25,24 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-import org.apache.flink.table.api._
 import org.apache.flink.types.Row
 
 /**
   * A simple [[TableSink]] to emit data as T to a collection.
   */
 class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] => 
TypeInformation[T]))
-  extends TableSinkBase[T] with BatchTableSink[T] {
+  extends TableSinkBase[T] with StreamTableSink[T] {
 
   private var collectOutputFormat: CollectOutputFormat[T] = _
 
-  override def emitBoundedStream(
-      boundedStream: DataStream[T],
-      tableConfig: TableConfig,
-      executionConfig: ExecutionConfig): DataStreamSink[T] = {
-    boundedStream.writeUsingOutputFormat(collectOutputFormat)
-        .setParallelism(1)
-        .name("collect")
+  override def consumeDataStream(dataStream: DataStream[T]): DataStreamSink[_] 
= {
+    dataStream.writeUsingOutputFormat(collectOutputFormat)
+      .setParallelism(1)
+      .name("collect")
+  }
+
+  override def emitDataStream(dataStream: DataStream[T]): Unit = {
+    consumeDataStream(dataStream)
   }
 
   override protected def copy: TableSinkBase[T] = {
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
index 2ee0449..2d7c46f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala
@@ -18,17 +18,28 @@
 
 package org.apache.flink.table.sinks
 
+import java.lang.{Boolean => JBool}
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
-import org.apache.flink.table.api.{Table, Types}
+import org.apache.flink.table.api.Types
 
-import java.lang.{Boolean => JBool}
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.table.api.Table
 
 /**
   * Defines an external [[TableSink]] to emit a streaming [[Table]] with 
insert, update, and delete
   * changes.
   *
+  * The table will be converted into a stream of accumulate and retraction 
messages which are
+  * encoded as [[JTuple2]].
+  * The first field is a [[JBool]] flag to indicate the message type.
+  * The second field holds the record of the requested type [[T]].
+  *
+  * A message with true [[JBool]] flag is an accumulate (or add) message.
+  * A message with false flag is a retract message.
+  *
   * @tparam T Type of records that this [[TableSink]] expects and supports.
   */
 trait RetractStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] {
@@ -36,6 +47,9 @@ trait RetractStreamTableSink[T] extends 
StreamTableSink[JTuple2[JBool, T]] {
   /** Returns the requested record type */
   def getRecordType: TypeInformation[T]
 
+  /** Emits the DataStream. */
+  def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit
+
   override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
 
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
deleted file mode 100644
index 3c4b556..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sinks
-
-import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
-
-/**
-  * Defines an external stream table and provides write access to its data.
-  *
-  * @tparam T Type of the [[DataStream]] created by this [[TableSink]].
-  */
-trait StreamTableSink[T] extends TableSink[T] {
-
-  /** Emits the DataStream. */
-  def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_]
-
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
index 330faac..d3be507 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala
@@ -18,14 +18,14 @@
 
 package org.apache.flink.table.sinks
 
+import java.lang.{Boolean => JBool}
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api.{Table, Types}
 
-import java.lang.{Boolean => JBool}
-
-
 /**
   * Defines an external [[TableSink]] to emit a streaming [[Table]] with 
insert, update, and delete
   * changes. The [[Table]] must be have unique key fields (atomic or 
composite) or be append-only.
@@ -36,6 +36,13 @@ import java.lang.{Boolean => JBool}
   * The unique key of the table is configured by the 
[[UpsertStreamTableSink#setKeyFields()]]
   * method.
   *
+  * The [[Table]] will be converted into a stream of upsert and delete 
messages which are encoded as
+  * [[JTuple2]]. The first field is a [[JBool]] flag to indicate the message 
type. The second field
+  * holds the record of the requested type [[T]].
+  *
+  * A message with true [[JBool]] field is an upsert message for the 
configured key.
+  * A message with false flag is a delete message for the configured key.
+  *
   * If the table is append-only, all messages will have a true flag and must 
be interpreted
   * as insertions.
   *
@@ -65,6 +72,9 @@ trait UpsertStreamTableSink[T] extends 
StreamTableSink[JTuple2[JBool, T]] {
   /** Returns the requested record type */
   def getRecordType: TypeInformation[T]
 
-  override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType)
+  /** Emits the DataStream. */
+  def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit
 
+  override def getOutputType: TypeInformation[JTuple2[JBool, T]] =
+    new TupleTypeInfo(Types.BOOLEAN, getRecordType)
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
index 4fa7b6d..31ab797 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.runtime.utils
 
-import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.functions.MapFunction
 import org.apache.flink.api.common.io.OutputFormat
 import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
@@ -30,7 +29,7 @@ import 
org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSn
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
 import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
-import org.apache.flink.table.api.{TableConfig, Types}
+import org.apache.flink.table.api.Types
 import org.apache.flink.table.dataformat.{BaseRow, DataFormatConverters, 
GenericRow}
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.types.TypeInfoLogicalTypeConverter
@@ -276,7 +275,8 @@ final class TestingUpsertTableSink(val keys: Array[Int], 
val tz: TimeZone)
 
   override def getFieldTypes: Array[TypeInformation[_]] = fTypes
 
-  override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, 
BaseRow]]) = {
+  override def consumeDataStream(
+      dataStream: DataStream[JTuple2[JBoolean, BaseRow]]): DataStreamSink[_] = 
{
     dataStream.map(new MapFunction[JTuple2[JBoolean, BaseRow], (Boolean, 
BaseRow)] {
       override def map(value: JTuple2[JBoolean, BaseRow]): (Boolean, BaseRow) 
= {
         (value.f0, value.f1)
@@ -294,6 +294,10 @@ final class TestingUpsertTableSink(val keys: Array[Int], 
val tz: TimeZone)
       .setParallelism(dataStream.getParallelism)
   }
 
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, 
BaseRow]]): Unit = {
+    consumeDataStream(dataStream)
+  }
+
   override def configure(
       fieldNames: Array[String],
       fieldTypes: Array[TypeInformation[_]]): TestingUpsertTableSink = {
@@ -310,8 +314,7 @@ final class TestingUpsertTableSink(val keys: Array[Int], 
val tz: TimeZone)
   def getUpsertResults: List[String] = sink.getUpsertResults
 }
 
-final class TestingAppendTableSink(tz: TimeZone) extends 
AppendStreamTableSink[Row]
-  with BatchTableSink[Row] {
+final class TestingAppendTableSink(tz: TimeZone) extends 
AppendStreamTableSink[Row] {
   var fNames: Array[String] = _
   var fTypes: Array[TypeInformation[_]] = _
   var sink = new TestingAppendSink(tz)
@@ -321,16 +324,13 @@ final class TestingAppendTableSink(tz: TimeZone) extends 
AppendStreamTableSink[R
     this(TimeZone.getTimeZone("UTC"))
   }
 
-  override def emitDataStream(dataStream: DataStream[Row]): 
DataStreamSink[Row] = {
+  override def consumeDataStream(dataStream: DataStream[Row]): 
DataStreamSink[_] = {
     dataStream.addSink(sink).name("TestingAppendTableSink")
       .setParallelism(dataStream.getParallelism)
   }
 
-  override def emitBoundedStream(
-      boundedStream: DataStream[Row],
-      tableConfig: TableConfig,
-      executionConfig: ExecutionConfig): DataStreamSink[Row] = {
-    boundedStream.writeUsingOutputFormat(outputFormat).name("appendTableSink")
+  override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+    consumeDataStream(dataStream)
   }
 
   override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, 
fNames)
@@ -352,7 +352,7 @@ final class TestingAppendTableSink(tz: TimeZone) extends 
AppendStreamTableSink[R
 
   def getAppendResults: List[String] = sink.getAppendResults
 
-  def getResults: List[String] = outputFormat.getResults
+  def getResults: List[String] = sink.getAppendResults
 }
 
 class TestingOutputFormat[T](tz: TimeZone)
@@ -483,7 +483,8 @@ final class TestingRetractTableSink(tz: TimeZone) extends 
RetractStreamTableSink
     this(TimeZone.getTimeZone("UTC"))
   }
 
-  override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, Row]]) 
= {
+  override def consumeDataStream(
+      dataStream: DataStream[JTuple2[JBoolean, Row]]): DataStreamSink[_] = {
     dataStream.map(new MapFunction[JTuple2[JBoolean, Row], (Boolean, Row)] {
       override def map(value: JTuple2[JBoolean, Row]): (Boolean, Row) = {
         (value.f0, value.f1)
@@ -494,6 +495,10 @@ final class TestingRetractTableSink(tz: TimeZone) extends 
RetractStreamTableSink
       .setParallelism(dataStream.getParallelism)
   }
 
+  override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, Row]]): 
Unit = {
+    consumeDataStream(dataStream)
+  }
+
   override def getRecordType: TypeInformation[Row] =
     new RowTypeInfo(fTypes, fNames)
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
index 2e6401d..7c7eefe 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala
@@ -71,16 +71,16 @@ abstract class BatchTableEnvImpl(
 
     if (!tableSource.isInstanceOf[BatchTableSource[_]] &&
         !tableSource.isInstanceOf[InputFormatTableSource[_]]) {
-      throw new TableException("Only BatchTableSource and 
InputFormatTableSource can be registered " +
-        "in BatchTableEnvironment.")
+      throw new TableException("Only BatchTableSource and 
InputFormatTableSource " +
+        "can be registered in BatchTableEnvironment.")
     }
   }
 
   override protected  def validateTableSink(configuredSink: TableSink[_]): 
Unit = {
     if (!configuredSink.isInstanceOf[BatchTableSink[_]] &&
-        !configuredSink.isInstanceOf[BoundedTableSink[_]]) {
-      throw new TableException("Only BatchTableSink and BoundedTableSink can 
be registered " +
-        "in BatchTableEnvironment.")
+        !configuredSink.isInstanceOf[OutputFormatTableSink[_]]) {
+      throw new TableException("Only BatchTableSink and OutputFormatTableSink 
" +
+        "can be registered in BatchTableEnvironment.")
     }
   }
 
@@ -119,8 +119,20 @@ abstract class BatchTableEnvImpl(
         val result: DataSet[T] = translate(table, batchQueryConfig)(outputType)
         // Give the DataSet to the TableSink to emit it.
         batchSink.emitDataSet(result)
+      case boundedSink: OutputFormatTableSink[T] =>
+        val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType)
+          .asInstanceOf[TypeInformation[T]]
+        // translate the Table into a DataSet and provide the type that the 
TableSink expects.
+        val result: DataSet[T] = translate(table, batchQueryConfig)(outputType)
+        // use the OutputFormat to consume the DataSet.
+        val dataSink = result.output(boundedSink.getOutputFormat)
+        dataSink.name(
+          TableConnectorUtils.generateRuntimeName(
+            boundedSink.getClass,
+            boundedSink.getTableSchema.getFieldNames))
       case _ =>
-        throw new TableException("BatchTableSink required to emit batch 
Table.")
+        throw new TableException(
+          "BatchTableSink or OutputFormatTableSink required to emit batch 
Table.")
     }
   }
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
index 9078560..c351a46 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala
@@ -156,7 +156,7 @@ abstract class StreamTableEnvImpl(
             withChangeFlag = true)(outputType)
         // Give the DataStream to the TableSink to emit it.
         retractSink.asInstanceOf[RetractStreamTableSink[Any]]
-          .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
+          .consumeDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, 
Any]]])
 
       case upsertSink: UpsertStreamTableSink[_] =>
         // optimize plan
@@ -185,7 +185,7 @@ abstract class StreamTableEnvImpl(
             withChangeFlag = true)(outputType)
         // Give the DataStream to the TableSink to emit it.
         upsertSink.asInstanceOf[UpsertStreamTableSink[Any]]
-          .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]])
+          .consumeDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, 
Any]]])
 
       case appendSink: AppendStreamTableSink[_] =>
         // optimize plan
@@ -206,7 +206,7 @@ abstract class StreamTableEnvImpl(
             streamQueryConfig,
             withChangeFlag = false)(outputType)
         // Give the DataStream to the TableSink to emit it.
-        
appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result)
+        
appendSink.asInstanceOf[AppendStreamTableSink[T]].consumeDataStream(result)
 
       case _ =>
         throw new TableException("Stream Tables can only be emitted by 
AppendStreamTableSink, " +
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
index d630950..adb1a40 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala
@@ -19,7 +19,6 @@
 package org.apache.flink.table.runtime.batch.table
 
 import java.io.File
-
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.scala.{ExecutionEnvironment, _}
@@ -28,11 +27,15 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase
 import 
org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.table.utils.MemoryTableSourceSinkUtil
+import 
org.apache.flink.table.utils.MemoryTableSourceSinkUtil.UnsafeMemoryOutputFormatTableSink
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
+import scala.collection.JavaConverters._
+
 @RunWith(classOf[Parameterized])
 class TableSinkITCase(
     configMode: TableConfigMode)
@@ -70,4 +73,32 @@ class TableSinkITCase(
 
     TestBaseUtils.compareResultsByLinesInMemory(expected, path)
   }
+
+  @Test
+  def testBoundedTableSink(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = BatchTableEnvironment.create(env, config)
+
+    val fieldNames = Array("c", "b")
+    val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG)
+    val sink = new UnsafeMemoryOutputFormatTableSink
+    tEnv.registerTableSink("testSink", sink.configure(fieldNames, fieldTypes))
+
+    val input = CollectionDataSets.get3TupleDataSet(env)
+      .map(x => x).setParallelism(4) // increase DOP to 4
+
+    input.toTable(tEnv, 'a, 'b, 'c)
+      .where('a < 5 || 'a > 17)
+      .select('c, 'b)
+      .insertInto("testSink")
+
+    env.execute()
+
+    val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava
+    val expected = Seq(
+      "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
+      "Comment#12,6", "Comment#13,6", "Comment#14,6", 
"Comment#15,6").mkString("\n")
+
+    TestBaseUtils.compareResultAsText(results, expected)
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
index 37342c9..5559c62 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala
@@ -19,8 +19,7 @@
 package org.apache.flink.table.utils
 
 import java.util
-
-import org.apache.flink.api.common.io.RichOutputFormat
+import org.apache.flink.api.common.io.{OutputFormat, RichOutputFormat}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
@@ -31,7 +30,7 @@ import 
org.apache.flink.streaming.api.functions.sink.RichSinkFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 import 
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
 import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, 
TableSinkBase}
+import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources._
 import org.apache.flink.types.Row
 
@@ -126,6 +125,31 @@ object MemoryTableSourceSinkUtil {
     }
   }
 
+  final class UnsafeMemoryOutputFormatTableSink extends 
OutputFormatTableSink[Row] {
+
+    var fieldNames: Array[String] = _
+    var fieldTypes: Array[TypeInformation[_]] = _
+
+    override def getOutputType: TypeInformation[Row] = {
+      new RowTypeInfo(getTableSchema.getFieldTypes, 
getTableSchema.getFieldNames)
+    }
+
+    override def getOutputFormat: OutputFormat[Row] = new 
MemoryCollectionOutputFormat
+
+    override def configure(
+        fieldNames: Array[String],
+        fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {
+      val newSink = new UnsafeMemoryOutputFormatTableSink
+      newSink.fieldNames = fieldNames
+      newSink.fieldTypes = fieldTypes
+      newSink
+    }
+
+    override def getFieldNames: Array[String] = fieldNames
+
+    override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
+  }
+
   private class MemoryAppendSink extends RichSinkFunction[Row]() {
 
     override def invoke(value: Row): Unit = {

Reply via email to