[
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429179#comment-15429179
]
ASF GitHub Bot commented on FLINK-4035:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2369#discussion_r75571320
--- Diff:
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is
compatible with Kafka 0.10.x
+ *
+ * Implementation note: This Producer wraps a Flink Kafka 0.9 Producer,
overriding only
+ * the "processElement" / "invoke" method.
+ */
+public class FlinkKafkaProducer010<T> extends StreamSink<T> {
--- End diff --
Overall, I think the solution here seems a bit too "hacky" to me. Here's a
few disadvantages I see: 1) the code in `processElement()` of this class is
still quite a bit duplicate of the `invoke()` in `FlinkKafaProducerBase`. 2)
with `writeToKafka()` we need to have a different usage style for the Kafka
producer in 0.10, compared to previous versions.
If I'm correct, I guess its due to the fact that we don't have access to
the embedded record timestamp in the provided `next` in the usual `invoke`?
I'm wondering whether or not a cleaner solution is to introduce a
`serializeTimestamp(T element)` method in the `KeyedSerializationSchema`, and
we can simply use that to make `FlinkKafkaProducerBase#invoke()` more general
by replacing instantiation of `ProducerRecord`s with an abstract
version-specific method. Then, `FlinkKafkaProducer010` can simply extend
`FlinkKafkaProducer09`.
We'll need a migration plan though if we're going to change the
serialization schema interface. If we're going for it, might as well migrate
the `KeyedDeserializationSchema` to include the timestamp too (give `null` for
0.8, 0.9).
Otherwise, I think the solution here is ok for a short term solution. What
do you think?
> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---------------------------------------------------
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.0.3
> Reporter: Elias Levy
> Assignee: Robert Metzger
> Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.
> Published messages now include timestamps and compressed messages now include
> relative offsets. As it is now, brokers must decompress publisher compressed
> messages, assign offset to them, and recompress them, which is wasteful and
> makes it less likely that compression will be used at all.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)