[
https://issues.apache.org/jira/browse/FLINK-3679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882265#comment-15882265
]
ASF GitHub Bot commented on FLINK-3679:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3314#discussion_r102898307
--- Diff:
flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
---
@@ -422,6 +429,99 @@ public void run() {
assertFalse("fetcher threads did not properly finish",
sourceContext.isStillBlocking());
}
+ @Test
+ public void testRichDeserializationSchema() throws Exception {
+ final String topic = "test-topic";
+ final int partition = 3;
+ final byte[] payload = new byte[] {1, 2, 3, 4};
+ final byte[] endPayload =
"end".getBytes(StandardCharsets.UTF_8);
+
+ final List<ConsumerRecord<byte[], byte[]>> records =
Arrays.asList(
+ new ConsumerRecord<>(topic, partition, 15, payload,
payload),
+ new ConsumerRecord<>(topic, partition, 16, payload,
payload),
+ new ConsumerRecord<>(topic, partition, 17, payload,
endPayload));
+
+ final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>>
data = new HashMap<>();
+ data.put(new TopicPartition(topic, partition), records);
+
+ final ConsumerRecords<byte[], byte[]> consumerRecords = new
ConsumerRecords<>(data);
+
+ // ----- the test consumer -----
+
+ final KafkaConsumer<?, ?> mockConsumer =
mock(KafkaConsumer.class);
+ when(mockConsumer.poll(anyLong())).thenAnswer(new
Answer<ConsumerRecords<?, ?>>() {
+ @Override
+ public ConsumerRecords<?, ?> answer(InvocationOnMock
invocation) {
+ return consumerRecords;
+ }
+ });
+
+
whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer);
+
+ // ----- build a fetcher -----
+
+ ArrayList<String> results = new ArrayList<>();
+ SourceContext<String> sourceContext = new
CollectingSourceContext<>(results, results);
+ List<KafkaTopicPartition> topics =
Collections.singletonList(new KafkaTopicPartition(topic, partition));
+ RichKeyedDeserializationSchema<String> schema = new
RichKeyedDeserializationSchema<String>() {
+ @Override
+ public void deserialize(
+ byte[] messageKey, byte[] message, String
topic, int partition,
+ long offset, Collector<String> collector)
throws IOException {
+ if (offset != 16) {
+ collector.collect(new String(message));
+ }
+ }
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return nextElement.equals("end");
+ }
+
+ @Override
+ public TypeInformation<String> getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+ };
+
+ final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
+ sourceContext,
+ topics,
+ null, /* no restored state */
+ null, /* periodic watermark extractor */
+ null, /* punctuated watermark extractor */
+ new TestProcessingTimeService(),
+ 10, /* watermark interval */
+ this.getClass().getClassLoader(),
+ false, /* checkpointing */
+ "task_name",
+ new UnregisteredMetricsGroup(),
+ schema,
+ new Properties(),
+ 0L,
+ StartupMode.GROUP_OFFSETS,
+ false);
+
+
+ // ----- run the fetcher -----
+
+ final AtomicReference<Throwable> error = new
AtomicReference<>();
+ final Thread fetcherRunner = new Thread("fetcher runner") {
--- End diff --
We have a nice utility `CheckedThread` that serves for the tested purpose
here (catching errors and storing its reference).
> DeserializationSchema should handle zero or more outputs for every input
> ------------------------------------------------------------------------
>
> Key: FLINK-3679
> URL: https://issues.apache.org/jira/browse/FLINK-3679
> Project: Flink
> Issue Type: Bug
> Components: DataStream API, Kafka Connector
> Reporter: Jamie Grier
> Assignee: Haohui Mai
>
> There are a couple of issues with the DeserializationSchema API that I think
> should be improved. This request has come to me via an existing Flink user.
> The main issue is simply that the API assumes that there is a one-to-one
> mapping between input and outputs. In reality there are scenarios where one
> input message (say from Kafka) might actually map to zero or more logical
> elements in the pipeline.
> Particularly important here is the case where you receive a message from a
> source (such as Kafka) and say the raw bytes don't deserialize properly.
> Right now the only recourse is to throw IOException and therefore fail the
> job.
> This is definitely not good since bad data is a reality and failing the job
> is not the right option. If the job fails we'll just end up replaying the
> bad data and the whole thing will start again.
> Instead in this case it would be best if the user could just return the empty
> set.
> The other case is where one input message should logically be multiple output
> messages. This case is probably less important since there are other ways to
> do this but in general it might be good to make the
> DeserializationSchema.deserialize() method return a collection rather than a
> single element.
> Maybe we need to support a DeserializationSchema variant that has semantics
> more like that of FlatMap.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)