This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9628
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit 5fb7a41705e46244cb196736ddfbdc58bcc533d1
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Aug 4 16:43:19 2020 +0200

    SLING-9628 - Allow to receive more than one message type
---
 .../journal/kafka/JsonRecordHandler.java           | 12 ++---
 .../journal/kafka/KafkaClientProvider.java         |  3 +-
 .../journal/kafka/KafkaJsonMessageSender.java      | 19 +++++++-
 .../journal/kafka/KafkaMessageInfo.java            |  1 +
 .../distribution/journal/kafka/KafkaPoller.java    | 55 +++++++++++++++++-----
 .../journal/kafka/KafkaPollerTest.java             | 39 ++++++++++++---
 .../sling/distribution/journal/kafka/Person.java   | 25 ++++++++++
 7 files changed, 125 insertions(+), 29 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
index 14fd391..ab29b43 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/JsonRecordHandler.java
@@ -29,24 +29,22 @@ import org.apache.sling.distribution.journal.MessageInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectReader;
 
 public class JsonRecordHandler<T> implements Consumer<ConsumerRecord<String, 
String>> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JsonRecordHandler.class);
 
-    private final MessageHandler<T> handler;
+    public final MessageHandler<T> handler;
 
-    private final ObjectReader reader;
+    public final ObjectReader reader;
 
-    public JsonRecordHandler(MessageHandler<T> handler, Class<T> clazz) {
+    public JsonRecordHandler(MessageHandler<T> handler, ObjectReader reader) {
+        this.reader = reader;
         this.handler = requireNonNull(handler);
-        ObjectMapper mapper = new ObjectMapper();
-        reader = mapper.readerFor(requireNonNull(clazz));
+        
     }
 
-    @Override
     public void accept(ConsumerRecord<String, String> record) {
         MessageInfo info = new KafkaMessageInfo(record);
         String payload = record.value();
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index 2a4391d..ea4b968 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -39,6 +39,7 @@ import static 
org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -136,7 +137,7 @@ public class KafkaClientProvider implements 
MessagingProvider, Closeable {
         } else {
             consumer.seekToEnd(topicPartitions);
         }
-        return KafkaPoller.createJsonPoller(consumer, eventSender, adapters);
+        return new KafkaPoller(consumer, eventSender, Arrays.asList(adapters));
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
index c5e3018..6080b41 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
@@ -22,12 +22,18 @@ import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION;
 
+import java.nio.charset.Charset;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingException;
@@ -69,7 +75,10 @@ public class KafkaJsonMessageSender<T> implements 
MessageSender<T> {
         try {
             ObjectWriter writer = mapper.writerFor(payload.getClass());
             String payloadSt = writer.writeValueAsString(payload);
-            ProducerRecord<String, String> record = new 
ProducerRecord<>(topic, PARTITION, null, payloadSt);
+            List<Header> headerList = 
properties.entrySet().stream().map(this::toHeader).collect(Collectors.toList());
+            RecordHeader messageType = 
header(KafkaMessageInfo.KEY_MESSAGE_TYPE, payload.getClass().getSimpleName());
+            headerList.add(messageType);
+            ProducerRecord<String, String> record = new 
ProducerRecord<>(topic, PARTITION, null, payloadSt, headerList);
             RecordMetadata metadata = producer.send(record).get();
             LOG.info("Sent to topic={}, offset={}", topic, metadata.offset());
         } catch (Exception e) {
@@ -78,4 +87,12 @@ public class KafkaJsonMessageSender<T> implements 
MessageSender<T> {
         }
     }
 
+    private Header toHeader(Entry<String, String> entry) {
+        return header(entry.getKey(), entry.getValue());
+    }
+
+
+    private RecordHeader header(String key, String value) {
+        return new RecordHeader(key, value.getBytes(Charset.forName("utf-8")));
+    }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
index 542a152..7642206 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.sling.distribution.journal.MessageInfo;
 
 public class KafkaMessageInfo implements MessageInfo {
+    public static final String KEY_MESSAGE_TYPE = "messageType";
 
     private final String topic;
     private final int partition;
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
index 54b8a73..e48a721 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
@@ -24,46 +24,60 @@ import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackground
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessagingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaPoller<T> implements Closeable {
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+
+public class KafkaPoller implements Closeable {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaPoller.class);
 
     private static final long ERROR_SLEEP_MS = 10000;
 
-    private final KafkaConsumer<String, T> consumer;
+    private final KafkaConsumer<String, String> consumer;
 
-    private final Consumer<ConsumerRecord<String, T>> handler;
+    private final Map<String, Consumer<ConsumerRecord<String, String>>> 
handlers;
     
     private final ExceptionEventSender eventSender;
     
+    private final ObjectMapper mapper;
+
     private volatile boolean running = true;
 
     long errorSleepMs;
 
-    public KafkaPoller(KafkaConsumer<String, T> consumer, ExceptionEventSender 
eventSender, Consumer<ConsumerRecord<String, T>> handler) {
-        this.handler = handler;
+
+    public KafkaPoller(KafkaConsumer<String, String> consumer, 
ExceptionEventSender eventSender, List<HandlerAdapter<?>> adapters) {
         this.consumer = requireNonNull(consumer);
         this.eventSender = requireNonNull(eventSender);
         this.errorSleepMs = ERROR_SLEEP_MS;
+        mapper = new ObjectMapper();
+        this.handlers = adapters.stream()
+            .collect(Collectors.toMap(adapter -> 
adapter.getType().getSimpleName(), this::toHandler));
         startBackgroundThread(this::run, "Message Poller");
     }
     
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    public static Closeable createJsonPoller(KafkaConsumer<String, String> 
consumer, ExceptionEventSender eventSender, HandlerAdapter<?> ...adapters) {
-        HandlerAdapter<?> adapter = adapters[0];
-        return new KafkaPoller<>(consumer, eventSender, new 
JsonRecordHandler(adapter.getHandler(), adapter.getType()));
+    <T> Consumer<ConsumerRecord<String, String>> toHandler(HandlerAdapter<T> 
adapter) {
+        ObjectReader reader = mapper.readerFor(adapter.getType());
+        return new JsonRecordHandler<T>(adapter.getHandler(), reader);
     }
-
+    
     @Override
     public void close() throws IOException {
         LOG.info("Shutdown poller");
@@ -90,12 +104,27 @@ public class KafkaPoller<T> implements Closeable {
         LOG.info("Stopped poller");
     }
     
-    public void handle(ConsumerRecord<String, T> record) {
+    public void handle(ConsumerRecord<String, String> record) {
         try {
-            handler.accept(record);
+            String messageType = getMessageType(record);
+            Consumer<ConsumerRecord<String, String>> handler = 
handlers.get(messageType);
+            if (handler != null) {
+                handler.accept(record);
+            } else {
+                LOG.info("No handler for messageType={}. Ignoring message.", 
messageType);
+            }
         } catch (Exception e) {
-            LOG.warn("Error consuming message {}", record.headers());
+            LOG.warn("Error consuming message {}", record.headers(), e);
+        }
+    }
+
+    private String getMessageType(ConsumerRecord<String, String> record) {
+        Iterator<Header> headers = 
record.headers().headers(KafkaMessageInfo.KEY_MESSAGE_TYPE).iterator();
+        if (!headers.hasNext()) {
+            throw new MessagingException("Header " + 
KafkaMessageInfo.KEY_MESSAGE_TYPE + " missing.");
         }
+        Header messageTypeHeader = headers.next();
+        return new String(messageTypeHeader.value(), Charset.forName("utf-8"));
     }
 
     private void sleepAfterError() {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
index 93778d2..6061d8f 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaPollerTest.java
@@ -23,11 +23,12 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.nio.charset.Charset;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Consumer;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -35,13 +36,23 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.record.TimestampType;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.runners.MockitoJUnitRunner;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
 @RunWith(MockitoJUnitRunner.class)
 public class KafkaPollerTest {
 
@@ -52,25 +63,39 @@ public class KafkaPollerTest {
     private KafkaConsumer<String, String> consumer;
 
     @Mock
-    private Consumer<ConsumerRecord<String, String>> handler;
+    private MessageHandler<Person> handler;
 
-    @SuppressWarnings("unchecked")
     @Test
     public void testHandleError() throws Exception {
-        ConsumerRecord<String, String> record = new ConsumerRecord<String, 
String>("topic", 1, 0l, "", "");
+        Person person = new Person();
+        person.name = "Chris";
+        ConsumerRecord<String, String> record = createRecordFor(person);
         when(consumer.poll(Mockito.any()))
             .thenReturn(records(Arrays.asList(record)))
             .thenThrow(new KafkaException("Expected"))
             .thenThrow(new WakeupException());
-        doThrow(new 
RuntimeException("Expected")).when(handler).accept(Mockito.any(ConsumerRecord.class));
-        KafkaPoller<String> poller = new KafkaPoller<String>(consumer, 
eventSender, handler);
+        doThrow(new 
RuntimeException("Expected")).when(handler).handle(Mockito.any(MessageInfo.class),
 Mockito.any(Person.class));
+        List<HandlerAdapter<?>> adapters = 
Collections.singletonList(HandlerAdapter.create(Person.class, handler));
+        KafkaPoller poller = new KafkaPoller(consumer, eventSender, adapters);
         poller.errorSleepMs = 100;
         // Should see "Error consuming message" in the log
-        verify(handler, 
timeout(1000)).accept(Mockito.any(ConsumerRecord.class));
+        verify(handler, timeout(1000)).handle(Mockito.any(MessageInfo.class), 
Mockito.any(Person.class));
         verify(eventSender, 
timeout(1000)).send(Mockito.any(KafkaException.class));
         verify(consumer, timeout(1000)).close();
         poller.close();
     }
+
+    private ConsumerRecord<String, String> createRecordFor(Person person) 
throws JsonProcessingException {
+        Headers headers = new 
RecordHeaders(Collections.singleton(header(KafkaMessageInfo.KEY_MESSAGE_TYPE, 
Person.class.getSimpleName())));
+        ObjectMapper mapper = new ObjectMapper();
+        String value = 
mapper.writerFor(Person.class).writeValueAsString(person);
+        return new ConsumerRecord<String, String>(
+                "topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0l, 0, 0, "", 
value, headers);
+    }
+
+    private RecordHeader header(String key, String value) {
+        return new RecordHeader(key, value.getBytes(Charset.forName("utf-8")));
+    }
     
     private ConsumerRecords<String, String> 
records(List<ConsumerRecord<String, String>> records) {
         Map<TopicPartition, List<ConsumerRecord<String, String>>> rm = new 
HashMap<>();
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/Person.java 
b/src/test/java/org/apache/sling/distribution/journal/kafka/Person.java
new file mode 100644
index 0000000..f50653b
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/kafka/Person.java
@@ -0,0 +1,25 @@
+/*************************************************************************
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+public class Person {
+    public String name;
+    public int age;
+}
\ No newline at end of file

Reply via email to