[
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15896252#comment-15896252
]
ASF GitHub Bot commented on FLINK-3679:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3314#discussion_r104312079
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
---
@@ -419,6 +424,164 @@ public void run() {
assertFalse("fetcher threads did not properly finish",
sourceContext.isStillBlocking());
}
+ @Test
+ public void testSkipCorruptedMessage() throws Exception {
+
+ // ----- some test data -----
+
+ final String topic = "test-topic";
+ final int partition = 3;
+ final byte[] payload = new byte[] {1, 2, 3, 4};
+
+ final List<ConsumerRecord<byte[], byte[]>> records =
Arrays.asList(
+ new ConsumerRecord<>(topic, partition, 15, payload,
payload),
+ new ConsumerRecord<>(topic, partition, 16, payload,
payload),
+ new ConsumerRecord<>(topic, partition, 17, payload,
"end".getBytes()));
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
data = new HashMap<>();
+ data.put(new TopicPartition(topic, partition), records);
+
+ final ConsumerRecords<byte[], byte[]> consumerRecords = new
ConsumerRecords<>(data);
+
+ // ----- the test consumer -----
+
+ final KafkaConsumer<?, ?> mockConsumer =
mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new
Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock
invocation) {
+ return consumerRecords;
+ }
+ });
+
+
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- build a fetcher -----
+
+ ArrayList<String> results = new ArrayList<>();
+ SourceContext<String> sourceContext = new
CollectingSourceContext<>(results, results);
+ Map<KafkaTopicPartition, Long> partitionsWithInitialOffsets =
+ Collections.singletonMap(new KafkaTopicPartition(topic,
partition), KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+ KeyedDeserializationSchema<String> schema = new
KeyedDeserializationSchema<String>() {
+
+ @Override
+ public String deserialize(byte[] messageKey, byte[]
message,
+
String topic, int partition, long offset) throws IOException {
+ return offset == 15 ? null : new
String(message);
+ }
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return "end".equals(nextElement);
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+ };
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext,
+ partitionsWithInitialOffsets,
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ true, /* checkpointing */
+ "task_name",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ false);
+
+
+ // ----- run the fetcher -----
+
+ fetcher.runFetchLoop();
+ assertEquals(1, results.size());
+ }
+
+ @Test
+ public void testNullAsEOF() throws Exception {
--- End diff --
I'm not sure if this test is necessary. It's essentially just testing that
`isEndOfStream` works when `isEndOfStream` is `true`. Whether or not the
condition is `element == null` seems irrelevant to what's been tested.
We also already have a `runEndOfStreamTest` in `KafkaConsumerTestBase`.
> Allow Kafka consumer to skip corrupted messages
> -----------------------------------------------
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, Kafka Connector
> Reporter: Jamie Grier
> Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think
> should be improved. This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one
> mapping between input and outputs. In reality there are scenarios where one
> input message (say from Kafka) might actually map to zero or more logical
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a
> source (such as Kafka) and say the raw bytes don't deserialize properly.
> Right now the only recourse is to throw IOException and therefore fail the
> job.
> This is definitely not good since bad data is a reality and failing the job
> is not the right option. If the job fails we'll just end up replaying the
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty
> set.
> The other case is where one input message should logically be multiple output
> messages. This case is probably less important since there are other ways to
> do this but in general it might be good to make the
> DeserializationSchema.deserialize() method return a collection rather than a
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics
> more like that of FlatMap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)