[
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433146#comment-15433146
]
ASF GitHub Bot commented on FLINK-4035:
---------------------------------------
Github user rmetzger commented on a diff in the pull request:
https://github.com/apache/flink/pull/2369#discussion_r75902757
--- Diff:
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is
compatible with Kafka 0.10.x
+ *
+ * Implementation note: This Producer wraps a Flink Kafka 0.9 Producer,
overriding only
+ * the "processElement" / "invoke" method.
+ */
+public class FlinkKafkaProducer010<T> extends StreamSink<T> {
+
+ /**
+ * Flag controlling whether we are writing the Flink record's timestamp
into Kafka.
+ */
+ private boolean writeTimestampToKafka = false;
+
+ // ---------------------- "Constructors" for the producer
------------------ //
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema
supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <T> FlinkKafkaProducer010Configuration
writeToKafka(DataStream<T> inStream,
+
String topicId,
+
KeyedSerializationSchema<T> serializationSchema,
+
Properties producerConfig) {
+ return writeToKafka(inStream, topicId, serializationSchema,
producerConfig, new FixedPartitioner<T>());
+ }
+
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a
DataStream to
+ * the topic.
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined (keyless) serialization
schema.
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <T> FlinkKafkaProducer010Configuration
writeToKafka(DataStream<T> inStream,
+
String topicId,
+
SerializationSchema<T> serializationSchema,
+
Properties producerConfig) {
+ return writeToKafka(inStream, topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new
FixedPartitioner<T>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a kafka-consumable byte[] supporting key/value
messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
+ */
+ public static <T> FlinkKafkaProducer010Configuration<T>
writeToKafka(DataStream<T> inStream,
+
String topicId,
+
KeyedSerializationSchema<T>
serializationSchema,
+
Properties producerConfig,
+
KafkaPartitioner<T>
customPartitioner) {
+ GenericTypeInfo<Object> objectTypeInfo = new
GenericTypeInfo<>(Object.class);
+ FlinkKafkaProducer010<T> kafkaProducer = new
FlinkKafkaProducer010<>(topicId, serializationSchema, producerConfig,
customPartitioner);
+ SingleOutputStreamOperator<Object> transformation =
inStream.transform("FlinKafkaProducer 0.10.x", objectTypeInfo, kafkaProducer);
+ return new FlinkKafkaProducer010Configuration<>(transformation,
kafkaProducer);
+ }
+
+ /**
+ * Configuration object returned by the writeToKafka() call.
+ */
+ public static class FlinkKafkaProducer010Configuration<T> extends
DataStreamSink<T> {
--- End diff --
I'll update it.
> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---------------------------------------------------
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.0.3
> Reporter: Elias Levy
> Assignee: Robert Metzger
> Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.
> Published messages now include timestamps and compressed messages now include
> relative offsets. As it is now, brokers must decompress publisher compressed
> messages, assign offset to them, and recompress them, which is wasteful and
> makes it less likely that compression will be used at all.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)