[
https://issues.apache.org/jira/browse/FLINK-4035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15429144#comment-15429144
]
ASF GitHub Bot commented on FLINK-4035:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/2369#discussion_r75569873
--- Diff:
flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internal;
+
+import
org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import
org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import
org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
+import org.apache.flink.util.SerializedValue;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * A fetcher that fetches data from Kafka brokers via the Kafka 0.10
consumer API.
+ *
+ * @param <T> The type of elements produced by the fetcher.
+ */
+public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
+
+ public Kafka010Fetcher(
+ SourceContext<T> sourceContext,
+ List<KafkaTopicPartition> assignedPartitions,
+ SerializedValue<AssignerWithPeriodicWatermarks<T>>
watermarksPeriodic,
+ SerializedValue<AssignerWithPunctuatedWatermarks<T>>
watermarksPunctuated,
+ StreamingRuntimeContext runtimeContext,
+ KeyedDeserializationSchema<T> deserializer,
+ Properties kafkaProperties,
+ long pollTimeout,
+ boolean useMetrics) throws Exception
+ {
+ super(sourceContext, assignedPartitions, watermarksPeriodic,
watermarksPunctuated, runtimeContext, deserializer, kafkaProperties,
pollTimeout, useMetrics);
+ }
+
+ @Override
+ protected void assignPartitionsToConsumer(KafkaConsumer<byte[], byte[]>
consumer, List<TopicPartition> topicPartitions) {
+ consumer.assign(topicPartitions);
+ }
+
+ /**
+ * Emit record Kafka-timestamp aware.
+ */
+ @Override
+ protected <R> void emitRecord(T record,
KafkaTopicPartitionState<TopicPartition> partitionState, long offset, R
kafkaRecord) throws Exception {
--- End diff --
Just realized here the reason for the new `kafkaRecord`.
However, would it be better to add an `recordTimestamp` argument instead of
passing the original Kafka record?
If we don't have such a timestamp (for 0.8, 0.9), a `null` can be accepted.
In `AbstractFetcher#emitRecord()`, we check if the value is `null` or not and
correspondingly call `collect()` or `collectWithTimestamp`. The same goes for
passing the `Long.MIN_VALUE` / timestamp to
`emitRecordWithTimestampAndPeriodicWatermark` and
`emitRecordWithTimestampAndPunctuatedWatermark `.
IMHO, I think this way the new code will be more meaningful and less
confusing for the base `AbstractFetcher`, and also we won't need to override
`emitRecord` in 0.10.
> Bump Kafka producer in Kafka sink to Kafka 0.10.0.0
> ---------------------------------------------------
>
> Key: FLINK-4035
> URL: https://issues.apache.org/jira/browse/FLINK-4035
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.0.3
> Reporter: Elias Levy
> Assignee: Robert Metzger
> Priority: Minor
>
> Kafka 0.10.0.0 introduced protocol changes related to the producer.
> Published messages now include timestamps and compressed messages now include
> relative offsets. As it is now, brokers must decompress publisher compressed
> messages, assign offset to them, and recompress them, which is wasteful and
> makes it less likely that compression will be used at all.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)