[
https://issues.apache.org/jira/browse/FLINK-9846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16552644#comment-16552644
]
ASF GitHub Bot commented on FLINK-9846:
---------------------------------------
Github user pnowojski commented on a diff in the pull request:
https://github.com/apache/flink/pull/6387#discussion_r204348635
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
---
@@ -82,49 +129,97 @@ public KafkaTableSink(
*
* @param rowSchema the schema of the row to serialize.
* @return Instance of serialization schema
+ * @deprecated Use the constructor to pass a serialization schema
instead.
*/
- protected abstract SerializationSchema<Row>
createSerializationSchema(RowTypeInfo rowSchema);
+ @Deprecated
+ protected SerializationSchema<Row>
createSerializationSchema(RowTypeInfo rowSchema) {
+ throw new UnsupportedOperationException("This method only
exists for backwards compatibility.");
+ }
/**
* Create a deep copy of this sink.
*
* @return Deep copy of this sink
*/
- protected abstract KafkaTableSink createCopy();
+ @Deprecated
+ protected KafkaTableSink createCopy() {
+ throw new UnsupportedOperationException("This method only
exists for backwards compatibility.");
+ }
@Override
public void emitDataStream(DataStream<Row> dataStream) {
- FlinkKafkaProducerBase<Row> kafkaProducer =
createKafkaProducer(topic, properties, serializationSchema, partitioner);
- // always enable flush on checkpoint to achieve at-least-once
if query runs with checkpointing enabled.
- kafkaProducer.setFlushOnCheckpoint(true);
+ SinkFunction<Row> kafkaProducer = createKafkaProducer(
+ topic,
+ properties,
+ serializationSchema.orElseThrow(() -> new
IllegalStateException("No serialization schema defined.")),
+ partitioner);
dataStream.addSink(kafkaProducer).name(TableConnectorUtil.generateRuntimeName(this.getClass(),
fieldNames));
}
@Override
public TypeInformation<Row> getOutputType() {
- return new RowTypeInfo(getFieldTypes());
+ return schema
+ .map(TableSchema::toRowType)
+ .orElseGet(() -> new RowTypeInfo(getFieldTypes()));
}
public String[] getFieldNames() {
- return fieldNames;
+ return
schema.map(TableSchema::getColumnNames).orElse(fieldNames);
}
@Override
public TypeInformation<?>[] getFieldTypes() {
- return fieldTypes;
+ return schema.map(TableSchema::getTypes).orElse(fieldTypes);
}
@Override
public KafkaTableSink configure(String[] fieldNames,
TypeInformation<?>[] fieldTypes) {
+ // a fixed schema is defined so reconfiguration is not supported
--- End diff --
Move this comment to exception description.
> Add a Kafka table sink factory
> ------------------------------
>
> Key: FLINK-9846
> URL: https://issues.apache.org/jira/browse/FLINK-9846
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
>
> FLINK-8866 implements a unified way of creating sinks and using the format
> discovery for searching for formats (FLINK-8858). It is now possible to add a
> Kafka table sink factory for streaming environment that uses the new
> interfaces.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)