This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 851d27bb54596d1044a831160b09aad9f3b32316 Author: Jark Wu <[email protected]> AuthorDate: Mon Jun 10 20:26:04 2019 +0800 [FLINK-12708][table] Introduce OutputFormatTableSink and make blink&flink planner support it 1. Introduce OutputFormatTableSink and extends StreamTableSink and only expose getOutputFormat interface 2. Add a method consumeDataStream(DataStream) to StreamTableSink which returns DataStreamSink always, and make it mandatory in Blink planner --- .../apache/flink/table/sinks/BatchTableSink.java | 3 ++ ...amTableSink.java => OutputFormatTableSink.java} | 27 +++++++++++++--- .../apache/flink/table/sinks/StreamTableSink.java | 21 ++++++++++++- .../plan/nodes/physical/batch/BatchExecSink.scala | 29 +++++++++++------ .../nodes/physical/stream/StreamExecSink.scala | 15 ++++++--- .../apache/flink/table/sinks/BatchTableSink.scala | 36 ---------------------- .../flink/table/sinks/CollectTableSink.scala | 19 ++++++------ .../flink/table/sinks/RetractStreamTableSink.scala | 20 ++++++++++-- .../apache/flink/table/sinks/StreamTableSink.scala | 33 -------------------- .../flink/table/sinks/UpsertStreamTableSink.scala | 18 ++++++++--- .../flink/table/runtime/utils/StreamTestSink.scala | 31 +++++++++++-------- .../apache/flink/table/api/BatchTableEnvImpl.scala | 24 +++++++++++---- .../flink/table/api/StreamTableEnvImpl.scala | 6 ++-- .../runtime/batch/table/TableSinkITCase.scala | 33 +++++++++++++++++++- .../table/utils/MemoryTableSourceSinkUtil.scala | 30 ++++++++++++++++-- 15 files changed, 213 insertions(+), 132 deletions(-) diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java index cbf7845..f9413b7 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/BatchTableSink.java @@ -24,7 +24,10 @@ import org.apache.flink.table.api.Table; /** Defines an external {@link TableSink} to emit a batch {@link Table}. * * @param <T> Type of {@link DataSet} that this {@link TableSink} expects and supports. + * + * @deprecated use {@link OutputFormatTableSink} instead. */ +@Deprecated public interface BatchTableSink<T> extends TableSink<T> { /** Emits the DataSet. */ diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java similarity index 51% copy from flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java copy to flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java index e3131f3..5ab2606 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/OutputFormatTableSink.java @@ -18,15 +18,32 @@ package org.apache.flink.table.sinks; +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; +import org.apache.flink.table.api.Table; /** - * Defines an external stream table and provides write access to its data. + * Defines an external {@link TableSink} to emit a bounded {@link Table}. * - * @param <T> Type of the {@link DataStream} created by this {@link TableSink}. + * @param <T> Type of the bounded {@link OutputFormat} that this {@link TableSink} expects and supports. */ -public interface StreamTableSink<T> extends TableSink<T> { +@Experimental +public abstract class OutputFormatTableSink<T> implements StreamTableSink<T> { - /** Emits the DataStream. */ - void emitDataStream(DataStream<T> dataStream); + /** + * Returns an {@link OutputFormat} for writing the data of the table. + */ + public abstract OutputFormat<T> getOutputFormat(); + + @Override + public final void emitDataStream(DataStream<T> dataStream) { + consumeDataStream(dataStream); + } + + @Override + public final DataStreamSink<T> consumeDataStream(DataStream<T> dataStream) { + return dataStream.writeUsingOutputFormat(getOutputFormat()); + } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java index e3131f3..825d5bb 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/sinks/StreamTableSink.java @@ -19,6 +19,7 @@ package org.apache.flink.table.sinks; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSink; /** * Defines an external stream table and provides write access to its data. @@ -27,6 +28,24 @@ import org.apache.flink.streaming.api.datastream.DataStream; */ public interface StreamTableSink<T> extends TableSink<T> { - /** Emits the DataStream. */ + /** + * Emits the DataStream. + * + * @deprecated This method will be removed in future versions as it returns nothing. + * It is recommended to use {@link #consumeDataStream(DataStream)} instead which + * returns the {@link DataStreamSink}. The returned {@link DataStreamSink} will be + * used to set resources for the sink operator. If the {@link #consumeDataStream(DataStream)} + * is implemented, this method can be empty implementation. + */ + @Deprecated void emitDataStream(DataStream<T> dataStream); + + /** + * Consumes the DataStream and return the sink transformation {@link DataStreamSink}. + * The returned {@link DataStreamSink} will be used to set resources for the sink operator. + */ + default DataStreamSink<?> consumeDataStream(DataStream<T> dataStream) { + emitDataStream(dataStream); + return null; + } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala index f7565a7..376630c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/batch/BatchExecSink.scala @@ -18,6 +18,7 @@ package org.apache.flink.table.plan.nodes.physical.batch +import java.util import org.apache.flink.runtime.operators.DamBehavior import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.transformations.{OneInputTransformation, StreamTransformation} @@ -28,15 +29,12 @@ import org.apache.flink.table.dataformat.BaseRow import org.apache.flink.table.plan.nodes.calcite.Sink import org.apache.flink.table.plan.nodes.exec.{BatchExecNode, ExecNode} import org.apache.flink.table.plan.nodes.resource.batch.parallelism.NodeResourceConfig -import org.apache.flink.table.sinks.{BatchTableSink, DataStreamTableSink, TableSink} +import org.apache.flink.table.sinks.{DataStreamTableSink, RetractStreamTableSink, StreamTableSink, TableSink, UpsertStreamTableSink} import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo import org.apache.flink.table.typeutils.BaseRowTypeInfo - import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.RelNode -import java.util - import scala.collection.JavaConversions._ /** @@ -78,11 +76,21 @@ class BatchExecSink[T]( override protected def translateToPlanInternal( tableEnv: BatchTableEnvironment): StreamTransformation[Any] = { val resultTransformation = sink match { - case batchTableSink: BatchTableSink[T] => + case _: RetractStreamTableSink[T] | _: UpsertStreamTableSink[T] => + throw new TableException("RetractStreamTableSink and UpsertStreamTableSink is not" + + " supported in Batch environment.") + + case streamTableSink: StreamTableSink[T] => + // we can insert the bounded DataStream into a StreamTableSink val transformation = translateToStreamTransformation(withChangeFlag = false, tableEnv) val boundedStream = new DataStream(tableEnv.streamEnv, transformation) - val sinkTransformation = batchTableSink.emitBoundedStream( - boundedStream, tableEnv.getConfig, tableEnv.streamEnv.getConfig).getTransformation + val dsSink = streamTableSink.consumeDataStream(boundedStream) + if (dsSink == null) { + throw new TableException("The StreamTableSink#consumeDataStream(DataStream) must be " + + "implemented and return the sink transformation DataStreamSink. " + + s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.") + } + val sinkTransformation = dsSink.getTransformation if (sinkTransformation.getMaxParallelism > 0) { sinkTransformation.setParallelism(sinkTransformation.getMaxParallelism) @@ -97,15 +105,16 @@ class BatchExecSink[T]( } sinkTransformation - case streamTableSink: DataStreamTableSink[T] => + case dsTableSink: DataStreamTableSink[T] => // In case of table to bounded stream through BatchTableEnvironment#toBoundedStream, we // insert a DataStreamTableSink then wrap it as a LogicalSink, there is no real batch table // sink, so we do not need to invoke TableSink#emitBoundedStream and set resource, just a // translation to StreamTransformation is ok. - translateToStreamTransformation(withChangeFlag = streamTableSink.withChangeFlag, tableEnv) + translateToStreamTransformation(withChangeFlag = dsTableSink.withChangeFlag, tableEnv) case _ => - throw new TableException("Only Support BatchTableSink or DataStreamTableSink!") + throw new TableException(s"Only Support StreamTableSink! " + + s"However ${sink.getClass.getCanonicalName} is not a StreamTableSink.") } resultTransformation.asInstanceOf[StreamTransformation[Any]] } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala index 9ec0f6e..0bab82b 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/physical/stream/StreamExecSink.scala @@ -115,17 +115,24 @@ class StreamExecSink[T]( "RetractStreamTableSink, or UpsertStreamTableSink.") } val dataStream = new DataStream(tableEnv.execEnv, transformation) - streamTableSink.emitDataStream(dataStream).getTransformation + val dsSink = streamTableSink.consumeDataStream(dataStream) + if (dsSink == null) { + throw new TableException("The StreamTableSink#consumeDataStream(DataStream) must be " + + "implemented and return the sink transformation DataStreamSink. " + + s"However, ${sink.getClass.getCanonicalName} doesn't implement this method.") + } + dsSink.getTransformation - case streamTableSink: DataStreamTableSink[_] => + case dsTableSink: DataStreamTableSink[_] => // In case of table to stream through BatchTableEnvironment#translateToDataStream, // we insert a DataStreamTableSink then wrap it as a LogicalSink, there is no real batch // table sink, so we do not need to invoke TableSink#emitBoundedStream and set resource, // just a translation to StreamTransformation is ok. - translateToStreamTransformation(streamTableSink.withChangeFlag, tableEnv) + translateToStreamTransformation(dsTableSink.withChangeFlag, tableEnv) case _ => - throw new TableException("Only Support StreamTableSink or DataStreamTableSink!") + throw new TableException(s"Only Support StreamTableSink! " + + s"However ${sink.getClass.getCanonicalName} is not a StreamTableSink.") } resultTransformation.asInstanceOf[StreamTransformation[Any]] } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala deleted file mode 100644 index 8ab44b4..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/BatchTableSink.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sinks - -import org.apache.flink.api.common.ExecutionConfig -import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} -import org.apache.flink.table.api._ - -/** Defines an external [[TableSink]] to emit a batch [[Table]]. - * - * @tparam T Type of [[DataStream]] that this [[TableSink]] expects and supports. - */ -trait BatchTableSink[T] extends TableSink[T] { - - /** Emits the DataStream. */ - def emitBoundedStream( - boundedStream: DataStream[T], - tableConfig: TableConfig, - executionConfig: ExecutionConfig): DataStreamSink[_] -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala index 8c50f57..40e3a02 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/CollectTableSink.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.sinks -import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.accumulators.SerializedListAccumulator import org.apache.flink.api.common.io.RichOutputFormat import org.apache.flink.api.common.typeinfo.TypeInformation @@ -26,24 +25,24 @@ import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} -import org.apache.flink.table.api._ import org.apache.flink.types.Row /** * A simple [[TableSink]] to emit data as T to a collection. */ class CollectTableSink[T](produceOutputType: (Array[TypeInformation[_]] => TypeInformation[T])) - extends TableSinkBase[T] with BatchTableSink[T] { + extends TableSinkBase[T] with StreamTableSink[T] { private var collectOutputFormat: CollectOutputFormat[T] = _ - override def emitBoundedStream( - boundedStream: DataStream[T], - tableConfig: TableConfig, - executionConfig: ExecutionConfig): DataStreamSink[T] = { - boundedStream.writeUsingOutputFormat(collectOutputFormat) - .setParallelism(1) - .name("collect") + override def consumeDataStream(dataStream: DataStream[T]): DataStreamSink[_] = { + dataStream.writeUsingOutputFormat(collectOutputFormat) + .setParallelism(1) + .name("collect") + } + + override def emitDataStream(dataStream: DataStream[T]): Unit = { + consumeDataStream(dataStream) } override protected def copy: TableSinkBase[T] = { diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala index 2ee0449..2d7c46f 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/RetractStreamTableSink.scala @@ -18,17 +18,28 @@ package org.apache.flink.table.sinks +import java.lang.{Boolean => JBool} + import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo -import org.apache.flink.table.api.{Table, Types} +import org.apache.flink.table.api.Types -import java.lang.{Boolean => JBool} +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.table.api.Table /** * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete * changes. * + * The table will be converted into a stream of accumulate and retraction messages which are + * encoded as [[JTuple2]]. + * The first field is a [[JBool]] flag to indicate the message type. + * The second field holds the record of the requested type [[T]]. + * + * A message with true [[JBool]] flag is an accumulate (or add) message. + * A message with false flag is a retract message. + * * @tparam T Type of records that this [[TableSink]] expects and supports. */ trait RetractStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] { @@ -36,6 +47,9 @@ trait RetractStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] { /** Returns the requested record type */ def getRecordType: TypeInformation[T] + /** Emits the DataStream. */ + def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit + override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala deleted file mode 100644 index 3c4b556..0000000 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/StreamTableSink.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.sinks - -import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} - -/** - * Defines an external stream table and provides write access to its data. - * - * @tparam T Type of the [[DataStream]] created by this [[TableSink]]. - */ -trait StreamTableSink[T] extends TableSink[T] { - - /** Emits the DataStream. */ - def emitDataStream(dataStream: DataStream[T]): DataStreamSink[_] - -} diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala index 330faac..d3be507 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/sinks/UpsertStreamTableSink.scala @@ -18,14 +18,14 @@ package org.apache.flink.table.sinks +import java.lang.{Boolean => JBool} + import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api.{Table, Types} -import java.lang.{Boolean => JBool} - - /** * Defines an external [[TableSink]] to emit a streaming [[Table]] with insert, update, and delete * changes. The [[Table]] must be have unique key fields (atomic or composite) or be append-only. @@ -36,6 +36,13 @@ import java.lang.{Boolean => JBool} * The unique key of the table is configured by the [[UpsertStreamTableSink#setKeyFields()]] * method. * + * The [[Table]] will be converted into a stream of upsert and delete messages which are encoded as + * [[JTuple2]]. The first field is a [[JBool]] flag to indicate the message type. The second field + * holds the record of the requested type [[T]]. + * + * A message with true [[JBool]] field is an upsert message for the configured key. + * A message with false flag is a delete message for the configured key. + * * If the table is append-only, all messages will have a true flag and must be interpreted * as insertions. * @@ -65,6 +72,9 @@ trait UpsertStreamTableSink[T] extends StreamTableSink[JTuple2[JBool, T]] { /** Returns the requested record type */ def getRecordType: TypeInformation[T] - override def getOutputType = new TupleTypeInfo(Types.BOOLEAN, getRecordType) + /** Emits the DataStream. */ + def emitDataStream(dataStream: DataStream[JTuple2[JBool, T]]): Unit + override def getOutputType: TypeInformation[JTuple2[JBool, T]] = + new TupleTypeInfo(Types.BOOLEAN, getRecordType) } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala index 4fa7b6d..31ab797 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/StreamTestSink.scala @@ -18,7 +18,6 @@ package org.apache.flink.table.runtime.utils -import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.common.state.{ListState, ListStateDescriptor} @@ -30,7 +29,7 @@ import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSn import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} import org.apache.flink.streaming.api.functions.sink.RichSinkFunction -import org.apache.flink.table.api.{TableConfig, Types} +import org.apache.flink.table.api.Types import org.apache.flink.table.dataformat.{BaseRow, DataFormatConverters, GenericRow} import org.apache.flink.table.sinks._ import org.apache.flink.table.types.TypeInfoLogicalTypeConverter @@ -276,7 +275,8 @@ final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone) override def getFieldTypes: Array[TypeInformation[_]] = fTypes - override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, BaseRow]]) = { + override def consumeDataStream( + dataStream: DataStream[JTuple2[JBoolean, BaseRow]]): DataStreamSink[_] = { dataStream.map(new MapFunction[JTuple2[JBoolean, BaseRow], (Boolean, BaseRow)] { override def map(value: JTuple2[JBoolean, BaseRow]): (Boolean, BaseRow) = { (value.f0, value.f1) @@ -294,6 +294,10 @@ final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone) .setParallelism(dataStream.getParallelism) } + override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, BaseRow]]): Unit = { + consumeDataStream(dataStream) + } + override def configure( fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TestingUpsertTableSink = { @@ -310,8 +314,7 @@ final class TestingUpsertTableSink(val keys: Array[Int], val tz: TimeZone) def getUpsertResults: List[String] = sink.getUpsertResults } -final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[Row] - with BatchTableSink[Row] { +final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[Row] { var fNames: Array[String] = _ var fTypes: Array[TypeInformation[_]] = _ var sink = new TestingAppendSink(tz) @@ -321,16 +324,13 @@ final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[R this(TimeZone.getTimeZone("UTC")) } - override def emitDataStream(dataStream: DataStream[Row]): DataStreamSink[Row] = { + override def consumeDataStream(dataStream: DataStream[Row]): DataStreamSink[_] = { dataStream.addSink(sink).name("TestingAppendTableSink") .setParallelism(dataStream.getParallelism) } - override def emitBoundedStream( - boundedStream: DataStream[Row], - tableConfig: TableConfig, - executionConfig: ExecutionConfig): DataStreamSink[Row] = { - boundedStream.writeUsingOutputFormat(outputFormat).name("appendTableSink") + override def emitDataStream(dataStream: DataStream[Row]): Unit = { + consumeDataStream(dataStream) } override def getOutputType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) @@ -352,7 +352,7 @@ final class TestingAppendTableSink(tz: TimeZone) extends AppendStreamTableSink[R def getAppendResults: List[String] = sink.getAppendResults - def getResults: List[String] = outputFormat.getResults + def getResults: List[String] = sink.getAppendResults } class TestingOutputFormat[T](tz: TimeZone) @@ -483,7 +483,8 @@ final class TestingRetractTableSink(tz: TimeZone) extends RetractStreamTableSink this(TimeZone.getTimeZone("UTC")) } - override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, Row]]) = { + override def consumeDataStream( + dataStream: DataStream[JTuple2[JBoolean, Row]]): DataStreamSink[_] = { dataStream.map(new MapFunction[JTuple2[JBoolean, Row], (Boolean, Row)] { override def map(value: JTuple2[JBoolean, Row]): (Boolean, Row) = { (value.f0, value.f1) @@ -494,6 +495,10 @@ final class TestingRetractTableSink(tz: TimeZone) extends RetractStreamTableSink .setParallelism(dataStream.getParallelism) } + override def emitDataStream(dataStream: DataStream[JTuple2[JBoolean, Row]]): Unit = { + consumeDataStream(dataStream) + } + override def getRecordType: TypeInformation[Row] = new RowTypeInfo(fTypes, fNames) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala index 2e6401d..7c7eefe 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/BatchTableEnvImpl.scala @@ -71,16 +71,16 @@ abstract class BatchTableEnvImpl( if (!tableSource.isInstanceOf[BatchTableSource[_]] && !tableSource.isInstanceOf[InputFormatTableSource[_]]) { - throw new TableException("Only BatchTableSource and InputFormatTableSource can be registered " + - "in BatchTableEnvironment.") + throw new TableException("Only BatchTableSource and InputFormatTableSource " + + "can be registered in BatchTableEnvironment.") } } override protected def validateTableSink(configuredSink: TableSink[_]): Unit = { if (!configuredSink.isInstanceOf[BatchTableSink[_]] && - !configuredSink.isInstanceOf[BoundedTableSink[_]]) { - throw new TableException("Only BatchTableSink and BoundedTableSink can be registered " + - "in BatchTableEnvironment.") + !configuredSink.isInstanceOf[OutputFormatTableSink[_]]) { + throw new TableException("Only BatchTableSink and OutputFormatTableSink " + + "can be registered in BatchTableEnvironment.") } } @@ -119,8 +119,20 @@ abstract class BatchTableEnvImpl( val result: DataSet[T] = translate(table, batchQueryConfig)(outputType) // Give the DataSet to the TableSink to emit it. batchSink.emitDataSet(result) + case boundedSink: OutputFormatTableSink[T] => + val outputType = fromDataTypeToLegacyInfo(sink.getConsumedDataType) + .asInstanceOf[TypeInformation[T]] + // translate the Table into a DataSet and provide the type that the TableSink expects. + val result: DataSet[T] = translate(table, batchQueryConfig)(outputType) + // use the OutputFormat to consume the DataSet. + val dataSink = result.output(boundedSink.getOutputFormat) + dataSink.name( + TableConnectorUtils.generateRuntimeName( + boundedSink.getClass, + boundedSink.getTableSchema.getFieldNames)) case _ => - throw new TableException("BatchTableSink required to emit batch Table.") + throw new TableException( + "BatchTableSink or OutputFormatTableSink required to emit batch Table.") } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala index 9078560..c351a46 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/StreamTableEnvImpl.scala @@ -156,7 +156,7 @@ abstract class StreamTableEnvImpl( withChangeFlag = true)(outputType) // Give the DataStream to the TableSink to emit it. retractSink.asInstanceOf[RetractStreamTableSink[Any]] - .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) + .consumeDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) case upsertSink: UpsertStreamTableSink[_] => // optimize plan @@ -185,7 +185,7 @@ abstract class StreamTableEnvImpl( withChangeFlag = true)(outputType) // Give the DataStream to the TableSink to emit it. upsertSink.asInstanceOf[UpsertStreamTableSink[Any]] - .emitDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) + .consumeDataStream(result.asInstanceOf[DataStream[JTuple2[JBool, Any]]]) case appendSink: AppendStreamTableSink[_] => // optimize plan @@ -206,7 +206,7 @@ abstract class StreamTableEnvImpl( streamQueryConfig, withChangeFlag = false)(outputType) // Give the DataStream to the TableSink to emit it. - appendSink.asInstanceOf[AppendStreamTableSink[T]].emitDataStream(result) + appendSink.asInstanceOf[AppendStreamTableSink[T]].consumeDataStream(result) case _ => throw new TableException("Stream Tables can only be emitted by AppendStreamTableSink, " + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala index d630950..adb1a40 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSinkITCase.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.runtime.batch.table import java.io.File - import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.util.CollectionDataSets import org.apache.flink.api.scala.{ExecutionEnvironment, _} @@ -28,11 +27,15 @@ import org.apache.flink.table.api.scala._ import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase import org.apache.flink.table.runtime.utils.TableProgramsTestBase.TableConfigMode import org.apache.flink.table.sinks.CsvTableSink +import org.apache.flink.table.utils.MemoryTableSourceSinkUtil +import org.apache.flink.table.utils.MemoryTableSourceSinkUtil.UnsafeMemoryOutputFormatTableSink import org.apache.flink.test.util.TestBaseUtils import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.Parameterized +import scala.collection.JavaConverters._ + @RunWith(classOf[Parameterized]) class TableSinkITCase( configMode: TableConfigMode) @@ -70,4 +73,32 @@ class TableSinkITCase( TestBaseUtils.compareResultsByLinesInMemory(expected, path) } + + @Test + def testBoundedTableSink(): Unit = { + val env = ExecutionEnvironment.getExecutionEnvironment + val tEnv = BatchTableEnvironment.create(env, config) + + val fieldNames = Array("c", "b") + val fieldTypes: Array[TypeInformation[_]] = Array(Types.STRING, Types.LONG) + val sink = new UnsafeMemoryOutputFormatTableSink + tEnv.registerTableSink("testSink", sink.configure(fieldNames, fieldTypes)) + + val input = CollectionDataSets.get3TupleDataSet(env) + .map(x => x).setParallelism(4) // increase DOP to 4 + + input.toTable(tEnv, 'a, 'b, 'c) + .where('a < 5 || 'a > 17) + .select('c, 'b) + .insertInto("testSink") + + env.execute() + + val results = MemoryTableSourceSinkUtil.tableDataStrings.asJava + val expected = Seq( + "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3", + "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n") + + TestBaseUtils.compareResultAsText(results, expected) + } } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala index 37342c9..5559c62 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MemoryTableSourceSinkUtil.scala @@ -19,8 +19,7 @@ package org.apache.flink.table.utils import java.util - -import org.apache.flink.api.common.io.RichOutputFormat +import org.apache.flink.api.common.io.{OutputFormat, RichOutputFormat} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.api.java.{DataSet, ExecutionEnvironment} @@ -31,7 +30,7 @@ import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext import org.apache.flink.table.api.TableSchema -import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, TableSinkBase} +import org.apache.flink.table.sinks._ import org.apache.flink.table.sources._ import org.apache.flink.types.Row @@ -126,6 +125,31 @@ object MemoryTableSourceSinkUtil { } } + final class UnsafeMemoryOutputFormatTableSink extends OutputFormatTableSink[Row] { + + var fieldNames: Array[String] = _ + var fieldTypes: Array[TypeInformation[_]] = _ + + override def getOutputType: TypeInformation[Row] = { + new RowTypeInfo(getTableSchema.getFieldTypes, getTableSchema.getFieldNames) + } + + override def getOutputFormat: OutputFormat[Row] = new MemoryCollectionOutputFormat + + override def configure( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = { + val newSink = new UnsafeMemoryOutputFormatTableSink + newSink.fieldNames = fieldNames + newSink.fieldTypes = fieldTypes + newSink + } + + override def getFieldNames: Array[String] = fieldNames + + override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes + } + private class MemoryAppendSink extends RichSinkFunction[Row]() { override def invoke(value: Row): Unit = {
