[
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15525262#comment-15525262
]
ASF GitHub Bot commented on FLINK-4035:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2369#discussion_r80628475
--- Diff:
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java
---
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import
org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
+import
org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
+import
org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+import java.util.Properties;
+
+import static
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.getPropertiesFromBrokerList;
+
+
+/**
+ * Flink Sink to produce data into a Kafka topic. This producer is
compatible with Kafka 0.10.x
+ *
+ * Implementation note: This producer is a hybrid between a regular
regular sink function (a)
+ * and a custom operator (b).
+ *
+ * For (a), the class implements the SinkFunction and RichFunction
interfaces.
+ * For (b), it extends the StreamTask class.
+ *
+ * Details about approach (a):
+ *
+ * Pre Kafka 0.10 producers only follow approach (a), allowing users to
use the producer using the
+ * DataStream.addSink() method.
+ * Since the APIs exposed in that variant do not allow accessing the the
timestamp attached to the record
+ * the Kafka 0.10 producer has a second invocation option, approach (b).
+ *
+ * Details about approach (b):
+ * Kafka 0.10 supports writing the timestamp attached to a record to
Kafka. When adding the
+ * FlinkKafkaProducer010 using the
FlinkKafkaProducer010.writeToKafkaWithTimestamps() method, the Kafka producer
+ * can access the internal record timestamp of the record and write it to
Kafka.
+ *
+ * All methods and constructors in this class are marked with the approach
they are needed for.
+ */
+public class FlinkKafkaProducer010<T> extends StreamSink<T> implements
SinkFunction<T>, RichFunction {
+
+ /**
+ * Flag controlling whether we are writing the Flink record's timestamp
into Kafka.
+ */
+ private boolean writeTimestampToKafka = false;
+
+ // ---------------------- "Constructors" for timestamp writing
------------------
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined serialization schema
supporting key/value messages
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <T> FlinkKafkaProducer010Configuration
writeToKafkaWithTimestamps(DataStream<T> inStream,
+
String topicId,
+
KeyedSerializationSchema<T> serializationSchema,
+
Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId,
serializationSchema, producerConfig, new FixedPartitioner<T>());
+ }
+
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. the sink produces a
DataStream to
+ * the topic.
+ *
+ * This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId ID of the Kafka topic.
+ * @param serializationSchema User defined (keyless) serialization
schema.
+ * @param producerConfig Properties with the producer configuration.
+ */
+ public static <T> FlinkKafkaProducer010Configuration
writeToKafkaWithTimestamps(DataStream<T> inStream,
+
String topicId,
+
SerializationSchema<T> serializationSchema,
+
Properties producerConfig) {
+ return writeToKafkaWithTimestamps(inStream, topicId, new
KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, new
FixedPartitioner<T>());
+ }
+
+ /**
+ * Creates a FlinkKafkaProducer for a given topic. The sink produces a
DataStream to
+ * the topic.
+ *
+ * This constructor allows writing timestamps to Kafka, it follow
approach (b) (see above)
+ *
+ * @param inStream The stream to write to Kafka
+ * @param topicId The name of the target topic
+ * @param serializationSchema A serializable serialization schema for
turning user objects into a kafka-consumable byte[] supporting key/value
messages
+ * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
+ * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
+ */
+ public static <T> FlinkKafkaProducer010Configuration<T>
writeToKafkaWithTimestamps(DataStream<T> inStream,
+
String topicId,
+
KeyedSerializationSchema<T> serializationSchema,
+
Properties producerConfig,
+
KafkaPartitioner<T> customPartitioner) {
--- End diff --
Checkstyle failed for these few lines here, have leading spaces :P
> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---------------------------------------------------
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.0.3
> Reporter: Elias Levy
> Assignee: Robert Metzger
> Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.
> Published messages now include timestamps and compressed messages now include
> relative offsets. As it is now, brokers must decompress publisher compressed
> messages, assign offset to them, and recompress them, which is wasteful and
> makes it less likely that compression will be used at all.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)