This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new c7ddabe  [FLINK-13341][table][connectors] 
StreamTableSink#consumeDataStream returns DataStreamSink when using blink 
planner.
c7ddabe is described below

commit c7ddabe66d17263336a7ad9176c1ef566c247167
Author: chenqi <[email protected]>
AuthorDate: Sat Jul 20 23:22:43 2019 +0800

    [FLINK-13341][table][connectors] StreamTableSink#consumeDataStream returns 
DataStreamSink when using blink planner.
    
    This closes #9186
---
 .../cassandra/CassandraAppendTableSink.java        | 31 +++++++++++++++-------
 .../ElasticsearchUpsertTableSinkBase.java          | 10 +++++--
 .../connectors/kafka/KafkaTableSinkBase.java       | 10 +++++--
 .../api/java/io/jdbc/JDBCAppendTableSink.java      | 12 ++++++---
 4 files changed, 47 insertions(+), 16 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
index 443a780..edf8c88 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraAppendTableSink.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.connectors.cassandra;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.utils.TableConnectorUtils;
 import org.apache.flink.types.Row;
@@ -76,15 +78,26 @@ public class CassandraAppendTableSink implements 
AppendStreamTableSink<Row> {
        }
 
        @Override
-       public void emitDataStream(DataStream<Row> dataStream) {
-               try {
-                       CassandraSink.addSink(dataStream)
-                               .setClusterBuilder(this.builder)
-                               .setQuery(this.cql)
-                               .build()
-                               
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
+       public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+               if (!(dataStream.getType() instanceof RowTypeInfo)) {
+                       throw new TableException("No support for the type of 
the given DataStream: " + dataStream.getType());
                }
+
+               CassandraRowSink sink = new CassandraRowSink(
+                       dataStream.getType().getArity(),
+                       cql,
+                       builder,
+                       CassandraSinkBaseConfig.newBuilder().build(),
+                       new NoOpCassandraFailureHandler());
+
+               return dataStream
+                               .addSink(sink)
+                               
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
+
+       }
+
+       @Override
+       public void emitDataStream(DataStream<Row> dataStream) {
+               consumeDataStream(dataStream);
        }
 }
diff --git 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
index 045d3a6..427bb6e 100644
--- 
a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
+++ 
b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchUpsertTableSinkBase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
@@ -168,7 +169,7 @@ public abstract class ElasticsearchUpsertTableSinkBase 
implements UpsertStreamTa
        }
 
        @Override
-       public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) 
{
+       public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, 
Row>> dataStream) {
                final ElasticsearchUpsertSinkFunction upsertFunction =
                        new ElasticsearchUpsertSinkFunction(
                                index,
@@ -184,11 +185,16 @@ public abstract class ElasticsearchUpsertTableSinkBase 
implements UpsertStreamTa
                        failureHandler,
                        sinkOptions,
                        upsertFunction);
-               dataStream.addSink(sinkFunction)
+               return dataStream.addSink(sinkFunction)
                        
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), 
getFieldNames()));
        }
 
        @Override
+       public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) 
{
+               consumeDataStream(dataStream);
+       }
+
+       @Override
        public TypeInformation<Tuple2<Boolean, Row>> getOutputType() {
                return Types.TUPLE(Types.BOOLEAN, getRecordType());
        }
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
index ce46eb6..ecadace 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import 
org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.table.api.TableSchema;
@@ -89,13 +90,18 @@ public abstract class KafkaTableSinkBase implements 
AppendStreamTableSink<Row> {
                Optional<FlinkKafkaPartitioner<Row>> partitioner);
 
        @Override
-       public void emitDataStream(DataStream<Row> dataStream) {
+       public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
                final SinkFunction<Row> kafkaProducer = createKafkaProducer(
                        topic,
                        properties,
                        serializationSchema,
                        partitioner);
-               
dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(),
 getFieldNames()));
+               return 
dataStream.addSink(kafkaProducer).name(TableConnectorUtils.generateRuntimeName(this.getClass(),
 getFieldNames()));
+       }
+
+       @Override
+       public void emitDataStream(DataStream<Row> dataStream) {
+               consumeDataStream(dataStream);
        }
 
        @Override
diff --git 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
index 5df66f6..c53bbc1 100644
--- 
a/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
+++ 
b/flink-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.BatchTableSink;
 import org.apache.flink.table.sinks.TableSink;
@@ -60,10 +61,15 @@ public class JDBCAppendTableSink implements 
AppendStreamTableSink<Row>, BatchTab
        }
 
        @Override
+       public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+               return dataStream
+                       .addSink(new JDBCSinkFunction(outputFormat))
+                       
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
+       }
+
+       @Override
        public void emitDataStream(DataStream<Row> dataStream) {
-               dataStream
-                               .addSink(new JDBCSinkFunction(outputFormat))
-                               
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), fieldNames));
+               consumeDataStream(dataStream);
        }
 
        @Override

Reply via email to