This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9628 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
commit 5fb7a41705e46244cb196736ddfbdc58bcc533d1 Author: Christian Schneider <[email protected]> AuthorDate: Tue Aug 4 16:43:19 2020 +0200 SLING-9628 - Allow to receive more than one message type --- .../journal/kafka/JsonRecordHandler.java | 12 ++--- .../journal/kafka/KafkaClientProvider.java | 3 +- .../journal/kafka/KafkaJsonMessageSender.java | 19 +++++++- .../journal/kafka/KafkaMessageInfo.java | 1 + .../distribution/journal/kafka/KafkaPoller.java | 55 +++++++++++++++++----- .../journal/kafka/KafkaPollerTest.java | 39 ++++++++++++--- .../sling/distribution/journal/kafka/Person.java | 25 ++++++++++ 7 files changed, 125 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java index 14fd391..ab29b43 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java @@ -29,24 +29,22 @@ import org.apache.sling.distribution.journal.MessageInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; public class JsonRecordHandler<T> implements Consumer<ConsumerRecord<String, String>> { private static final Logger LOG = LoggerFactory.getLogger(JsonRecordHandler.class); - private final MessageHandler<T> handler; + public final MessageHandler<T> handler; - private final ObjectReader reader; + public final ObjectReader reader; - public JsonRecordHandler(MessageHandler<T> handler, Class<T> clazz) { + public JsonRecordHandler(MessageHandler<T> handler, ObjectReader reader) { + this.reader = reader; this.handler = requireNonNull(handler); - ObjectMapper mapper = new ObjectMapper(); - reader = mapper.readerFor(requireNonNull(clazz)); + } - @Override public void accept(ConsumerRecord<String, String> record) { MessageInfo info = new KafkaMessageInfo(record); String payload = record.value(); diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java index 2a4391d..ea4b968 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java @@ -39,6 +39,7 @@ import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM; import java.io.Closeable; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.List; @@ -136,7 +137,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable { } else { consumer.seekToEnd(topicPartitions); } - return KafkaPoller.createJsonPoller(consumer, eventSender, adapters); + return new KafkaPoller(consumer, eventSender, Arrays.asList(adapters)); } @Override diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java index c5e3018..6080b41 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java @@ -22,12 +22,18 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION; +import java.nio.charset.Charset; import java.util.Collections; +import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.sling.distribution.journal.ExceptionEventSender; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingException; @@ -69,7 +75,10 @@ public class KafkaJsonMessageSender<T> implements MessageSender<T> { try { ObjectWriter writer = mapper.writerFor(payload.getClass()); String payloadSt = writer.writeValueAsString(payload); - ProducerRecord<String, String> record = new ProducerRecord<>(topic, PARTITION, null, payloadSt); + List<Header> headerList = properties.entrySet().stream().map(this::toHeader).collect(Collectors.toList()); + RecordHeader messageType = header(KafkaMessageInfo.KEY_MESSAGE_TYPE, payload.getClass().getSimpleName()); + headerList.add(messageType); + ProducerRecord<String, String> record = new ProducerRecord<>(topic, PARTITION, null, payloadSt, headerList); RecordMetadata metadata = producer.send(record).get(); LOG.info("Sent to topic={}, offset={}", topic, metadata.offset()); } catch (Exception e) { @@ -78,4 +87,12 @@ public class KafkaJsonMessageSender<T> implements MessageSender<T> { } } + private Header toHeader(Entry<String, String> entry) { + return header(entry.getKey(), entry.getValue()); + } + + + private RecordHeader header(String key, String value) { + return new RecordHeader(key, value.getBytes(Charset.forName("utf-8"))); + } } diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java index 542a152..7642206 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.sling.distribution.journal.MessageInfo; public class KafkaMessageInfo implements MessageInfo { + public static final String KEY_MESSAGE_TYPE = "messageType"; private final String topic; private final int partition; diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java index 54b8a73..e48a721 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java @@ -24,46 +24,60 @@ import static org.apache.sling.distribution.journal.RunnableUtil.startBackground import java.io.Closeable; import java.io.IOException; +import java.nio.charset.Charset; +import java.util.Iterator; +import java.util.List; +import java.util.Map; import java.util.function.Consumer; +import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; import org.apache.sling.distribution.journal.ExceptionEventSender; import org.apache.sling.distribution.journal.HandlerAdapter; +import org.apache.sling.distribution.journal.MessagingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class KafkaPoller<T> implements Closeable { +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; + +public class KafkaPoller implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(KafkaPoller.class); private static final long ERROR_SLEEP_MS = 10000; - private final KafkaConsumer<String, T> consumer; + private final KafkaConsumer<String, String> consumer; - private final Consumer<ConsumerRecord<String, T>> handler; + private final Map<String, Consumer<ConsumerRecord<String, String>>> handlers; private final ExceptionEventSender eventSender; + private final ObjectMapper mapper; + private volatile boolean running = true; long errorSleepMs; - public KafkaPoller(KafkaConsumer<String, T> consumer, ExceptionEventSender eventSender, Consumer<ConsumerRecord<String, T>> handler) { - this.handler = handler; + + public KafkaPoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, List<HandlerAdapter<?>> adapters) { this.consumer = requireNonNull(consumer); this.eventSender = requireNonNull(eventSender); this.errorSleepMs = ERROR_SLEEP_MS; + mapper = new ObjectMapper(); + this.handlers = adapters.stream() + .collect(Collectors.toMap(adapter -> adapter.getType().getSimpleName(), this::toHandler)); startBackgroundThread(this::run, "Message Poller"); } - @SuppressWarnings({ "rawtypes", "unchecked" }) - public static Closeable createJsonPoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, HandlerAdapter<?> ...adapters) { - HandlerAdapter<?> adapter = adapters[0]; - return new KafkaPoller<>(consumer, eventSender, new JsonRecordHandler(adapter.getHandler(), adapter.getType())); + <T> Consumer<ConsumerRecord<String, String>> toHandler(HandlerAdapter<T> adapter) { + ObjectReader reader = mapper.readerFor(adapter.getType()); + return new JsonRecordHandler<T>(adapter.getHandler(), reader); } - + @Override public void close() throws IOException { LOG.info("Shutdown poller"); @@ -90,12 +104,27 @@ public class KafkaPoller<T> implements Closeable { LOG.info("Stopped poller"); } - public void handle(ConsumerRecord<String, T> record) { + public void handle(ConsumerRecord<String, String> record) { try { - handler.accept(record); + String messageType = getMessageType(record); + Consumer<ConsumerRecord<String, String>> handler = handlers.get(messageType); + if (handler != null) { + handler.accept(record); + } else { + LOG.info("No handler for messageType={}. Ignoring message.", messageType); + } } catch (Exception e) { - LOG.warn("Error consuming message {}", record.headers()); + LOG.warn("Error consuming message {}", record.headers(), e); + } + } + + private String getMessageType(ConsumerRecord<String, String> record) { + Iterator<Header> headers = record.headers().headers(KafkaMessageInfo.KEY_MESSAGE_TYPE).iterator(); + if (!headers.hasNext()) { + throw new MessagingException("Header " + KafkaMessageInfo.KEY_MESSAGE_TYPE + " missing."); } + Header messageTypeHeader = headers.next(); + return new String(messageTypeHeader.value(), Charset.forName("utf-8")); } private void sleepAfterError() { diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java index 93778d2..6061d8f 100644 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java @@ -23,11 +23,12 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.nio.charset.Charset; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.function.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -35,13 +36,23 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.record.TimestampType; import org.apache.sling.distribution.journal.ExceptionEventSender; +import org.apache.sling.distribution.journal.HandlerAdapter; +import org.apache.sling.distribution.journal.MessageHandler; +import org.apache.sling.distribution.journal.MessageInfo; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.runners.MockitoJUnitRunner; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + @RunWith(MockitoJUnitRunner.class) public class KafkaPollerTest { @@ -52,25 +63,39 @@ public class KafkaPollerTest { private KafkaConsumer<String, String> consumer; @Mock - private Consumer<ConsumerRecord<String, String>> handler; + private MessageHandler<Person> handler; - @SuppressWarnings("unchecked") @Test public void testHandleError() throws Exception { - ConsumerRecord<String, String> record = new ConsumerRecord<String, String>("topic", 1, 0l, "", ""); + Person person = new Person(); + person.name = "Chris"; + ConsumerRecord<String, String> record = createRecordFor(person); when(consumer.poll(Mockito.any())) .thenReturn(records(Arrays.asList(record))) .thenThrow(new KafkaException("Expected")) .thenThrow(new WakeupException()); - doThrow(new RuntimeException("Expected")).when(handler).accept(Mockito.any(ConsumerRecord.class)); - KafkaPoller<String> poller = new KafkaPoller<String>(consumer, eventSender, handler); + doThrow(new RuntimeException("Expected")).when(handler).handle(Mockito.any(MessageInfo.class), Mockito.any(Person.class)); + List<HandlerAdapter<?>> adapters = Collections.singletonList(HandlerAdapter.create(Person.class, handler)); + KafkaPoller poller = new KafkaPoller(consumer, eventSender, adapters); poller.errorSleepMs = 100; // Should see "Error consuming message" in the log - verify(handler, timeout(1000)).accept(Mockito.any(ConsumerRecord.class)); + verify(handler, timeout(1000)).handle(Mockito.any(MessageInfo.class), Mockito.any(Person.class)); verify(eventSender, timeout(1000)).send(Mockito.any(KafkaException.class)); verify(consumer, timeout(1000)).close(); poller.close(); } + + private ConsumerRecord<String, String> createRecordFor(Person person) throws JsonProcessingException { + Headers headers = new RecordHeaders(Collections.singleton(header(KafkaMessageInfo.KEY_MESSAGE_TYPE, Person.class.getSimpleName()))); + ObjectMapper mapper = new ObjectMapper(); + String value = mapper.writerFor(Person.class).writeValueAsString(person); + return new ConsumerRecord<String, String>( + "topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0l, 0, 0, "", value, headers); + } + + private RecordHeader header(String key, String value) { + return new RecordHeader(key, value.getBytes(Charset.forName("utf-8"))); + } private ConsumerRecords<String, String> records(List<ConsumerRecord<String, String>> records) { Map<TopicPartition, List<ConsumerRecord<String, String>>> rm = new HashMap<>(); diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/Person.java b/src/test/java/org/apache/sling/distribution/journal/kafka/Person.java new file mode 100644 index 0000000..f50653b --- /dev/null +++ b/src/test/java/org/apache/sling/distribution/journal/kafka/Person.java @@ -0,0 +1,25 @@ +/************************************************************************* +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.sling.distribution.journal.kafka; + +public class Person { + public String name; + public int age; +} \ No newline at end of file
