[
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882262#comment-15882262
]
ASF GitHub Bot commented on FLINK-3679:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3314#discussion_r102900986
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
---
@@ -142,25 +141,38 @@ public void runFetchLoop() throws Exception {
final ConsumerRecords<byte[], byte[]> records =
handover.pollNext();
// get the records for each topic partition
- for (KafkaTopicPartitionState<TopicPartition>
partition : subscribedPartitions()) {
+ for (final
KafkaTopicPartitionState<TopicPartition> partition : subscribedPartitions()) {
List<ConsumerRecord<byte[], byte[]>>
partitionRecords =
records.records(partition.getKafkaPartitionHandle());
- for (ConsumerRecord<byte[], byte[]>
record : partitionRecords) {
- final T value =
deserializer.deserialize(
- record.key(),
record.value(),
- record.topic(),
record.partition(), record.offset());
-
- if
(deserializer.isEndOfStream(value)) {
- // end of stream
signaled
- running = false;
- break;
- }
-
- // emit the actual record. this
also updates offset state atomically
- // and deals with timestamps
and watermark generation
- emitRecord(value, partition,
record.offset(), record);
+ for (final ConsumerRecord<byte[],
byte[]> record : partitionRecords) {
+ final Collector<T> collector =
new Collector<T>() {
+ @Override
+ public void collect(T
value) {
+ if
(deserializer.isEndOfStream(value)) {
+ // end
of stream signaled
+ running
= false;
+ } else {
+ // emit
the actual record. this also updates offset state atomically
+ // and
deals with timestamps and watermark generation
+ try {
+
emitRecord(value, partition, record.offset(), record);
+ } catch
(Exception e) {
+
throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+
+ }
+ };
+
+ deserializer.deserialize(
+ record.key(),
record.value(),
+ record.topic(),
record.partition(), record.offset(), collector);
--- End diff --
The formatting for the list of arguments here could be nicer. Perhaps one
argument per line?
> DeserializationSchema should handle zero or more outputs for every input
> ------------------------------------------------------------------------
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, Kafka Connector
> Reporter: Jamie Grier
> Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think
> should be improved. This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one
> mapping between input and outputs. In reality there are scenarios where one
> input message (say from Kafka) might actually map to zero or more logical
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a
> source (such as Kafka) and say the raw bytes don't deserialize properly.
> Right now the only recourse is to throw IOException and therefore fail the
> job.
> This is definitely not good since bad data is a reality and failing the job
> is not the right option. If the job fails we'll just end up replaying the
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty
> set.
> The other case is where one input message should logically be multiple output
> messages. This case is probably less important since there are other ways to
> do this but in general it might be good to make the
> DeserializationSchema.deserialize() method return a collection rather than a
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics
> more like that of FlatMap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)