This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-9504
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit d7008b13367ae882a8df4b77ff78df2324289eca
Author: Christian Schneider <[email protected]>
AuthorDate: Mon Jun 15 18:40:58 2020 +0200

    SLING-9504 - Switch from protobuf to json
---
 pom.xml                                            |   5 +-
 .../journal/kafka/KafkaClientProvider.java         |  56 ++---------
 .../journal/kafka/KafkaJsonMessageSender.java      |  21 ++++-
 .../journal/kafka/KafkaMessageInfo.java            |  15 +++
 .../journal/kafka/KafkaMessageSender.java          |  89 ------------------
 .../distribution/journal/kafka/KafkaPoller.java    |  11 +--
 .../journal/kafka/ProtobufRecordHandler.java       |  86 -----------------
 .../journal/kafka/JsonMessagingTest.java           | 102 ---------------------
 .../journal/kafka/KafkaJsonMessageSenderTest.java  |   4 +-
 .../journal/kafka/KafkaMessageSenderTest.java      |  73 ---------------
 .../distribution/journal/kafka/MessagingTest.java  |  60 ++++++------
 .../journal/kafka/ProtobufRecordHandlerTest.java   |  53 -----------
 .../journal/kafka/util/KafkaLocal.java             |   1 +
 13 files changed, 75 insertions(+), 501 deletions(-)

diff --git a/pom.xml b/pom.xml
index 015966e..112e52f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -34,7 +34,7 @@
     <!-- P R O J E C T                                                         
  -->
     <!-- 
======================================================================= -->
     <artifactId>org.apache.sling.distribution.journal.kafka</artifactId>
-    <version>0.1.5-SNAPSHOT</version>
+    <version>0.1.5-JSON-SNAPSHOT</version>
 
     <name>Apache Sling Journal Messaging based on Apache Kafka</name>
     <description>Implementation of Apache Sling Content Distribution Journal 
Messaging based on Apache Kafka.</description>
@@ -70,13 +70,12 @@
         <dependency>
             <groupId>org.apache.sling</groupId>
             
<artifactId>org.apache.sling.distribution.journal.messages</artifactId>
-            <version>0.1.2</version>
+            <version>0.1.7-JSON-SNAPSHOT</version>
         </dependency>
         <!-- OSGi -->
         <dependency>
             <groupId>org.osgi</groupId>
             <artifactId>org.osgi.service.component.annotations</artifactId>
-            <version>1.3.0</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
index 199e7fe..2a4391d 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -52,15 +52,11 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.JsonMessageSender;
-import org.apache.sling.distribution.journal.MessageHandler;
 import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.apache.sling.distribution.journal.MessagingProvider;
@@ -75,13 +71,11 @@ import org.osgi.service.metatype.annotations.Designate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.protobuf.GeneratedMessage;
-
 @Component(service = MessagingProvider.class, configurationPolicy = 
ConfigurationPolicy.REQUIRE)
 @Designate(ocd = KafkaEndpoint.class)
 public class KafkaClientProvider implements MessagingProvider, Closeable {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaClientProvider.class);
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
 
     public static final int PARTITION = 0;
     
@@ -124,18 +118,14 @@ public class KafkaClientProvider implements 
MessagingProvider, Closeable {
     }
 
     @Override
-    public <T extends GeneratedMessage> MessageSender<T> createSender() {
-        return new KafkaMessageSender<>(buildKafkaProducer(), eventSender);
-    }
-
-    @Override
-    public <T> Closeable createPoller(String topicName, Reset reset, 
HandlerAdapter<?>... adapters) {
-        return createPoller(topicName, reset, null, adapters);
+    public <T> MessageSender<T> createSender(String topic) {
+        return new KafkaJsonMessageSender<>(buildJsonKafkaProducer(), topic, 
eventSender);
     }
 
     @Override
     public Closeable createPoller(String topicName, Reset reset, @Nullable 
String assign, HandlerAdapter<?>... adapters) {
-        KafkaConsumer<String, byte[]> consumer = 
createConsumer(ByteArrayDeserializer.class, reset);
+        log.info("Creating poller for topic={}, reset={}, assing={} with 
adapters {}.", topicName, reset, assign, adapters);
+        KafkaConsumer<String, String> consumer = 
createConsumer(StringDeserializer.class, reset);
         TopicPartition topicPartition = new TopicPartition(topicName, 
PARTITION);
         Collection<TopicPartition> topicPartitions = singleton(topicPartition);
         consumer.assign(topicPartitions);
@@ -146,28 +136,7 @@ public class KafkaClientProvider implements 
MessagingProvider, Closeable {
         } else {
             consumer.seekToEnd(topicPartitions);
         }
-        Closeable poller = KafkaPoller.createProtobufPoller(consumer, 
eventSender, adapters);
-        LOG.info("Created poller for reset {}, topicName {}, assign {}", 
reset, topicName, assign);
-        return poller;
-    }
-
-    @Override
-    public <T> JsonMessageSender<T> createJsonSender() {
-        return new KafkaJsonMessageSender<>(buildJsonKafkaProducer(), 
eventSender);
-    }
-
-    @Override
-    public <T> Closeable createJsonPoller(String topicName, Reset reset, 
MessageHandler<T> handler, Class<T> type) {
-        KafkaConsumer<String, String> consumer = 
createConsumer(StringDeserializer.class, reset);
-        TopicPartition topicPartition = new TopicPartition(topicName, 
PARTITION);
-        Collection<TopicPartition> topicPartitions = singleton(topicPartition);
-        consumer.assign(topicPartitions);
-        if (reset == Reset.earliest) {
-            consumer.seekToBeginning(topicPartitions);
-        } else {
-            consumer.seekToEnd(topicPartitions);
-        }
-        return KafkaPoller.createJsonPoller(consumer, eventSender, handler, 
type);
+        return KafkaPoller.createJsonPoller(consumer, eventSender, adapters);
     }
 
     @Override
@@ -223,17 +192,6 @@ public class KafkaClientProvider implements 
MessagingProvider, Closeable {
     }
 
     @Nonnull
-    private synchronized KafkaProducer<String, byte[]> buildKafkaProducer() {
-        if (rawProducer == null) {
-            try (CLSwitch switcher = new 
CLSwitch(KafkaProducer.class.getClassLoader())) {
-                rawProducer = new 
KafkaProducer<>(producerConfig(ByteArraySerializer.class));
-            }
-            rawProducer = new 
KafkaProducer<>(producerConfig(ByteArraySerializer.class));
-        }
-        return rawProducer;
-    }
-
-    @Nonnull
     private synchronized KafkaProducer<String, String> 
buildJsonKafkaProducer() {
         if (jsonProducer == null) {
             jsonProducer = new 
KafkaProducer<>(producerConfig(StringSerializer.class));
@@ -277,6 +235,4 @@ public class KafkaClientProvider implements 
MessagingProvider, Closeable {
         }
         return Long.parseLong(chunks[1]);
     }
-
-
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
index 9951a79..ade6833 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
@@ -22,11 +22,14 @@ import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION;
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.JsonMessageSender;
+import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +37,7 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.ObjectWriter;
 
-public class KafkaJsonMessageSender<T> implements JsonMessageSender<T> {
+public class KafkaJsonMessageSender<T> implements MessageSender<T> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaJsonMessageSender.class);
 
@@ -44,16 +47,25 @@ public class KafkaJsonMessageSender<T> implements 
JsonMessageSender<T> {
 
     private final ExceptionEventSender eventSender;
 
-    public KafkaJsonMessageSender(KafkaProducer<String, String> producer, 
ExceptionEventSender eventSender) {
+    private final String topic;
+
+    public KafkaJsonMessageSender(KafkaProducer<String, String> producer, 
String topic, ExceptionEventSender eventSender) {
+        this.topic = topic;
         this.eventSender = eventSender;
         this.producer = requireNonNull(producer);
     }
 
+    
+    @Override
+    public void send(T payload) throws MessagingException {
+        send(payload, Collections.emptyMap());
+    }
+    
     /**
      * {@inheritDoc}
      */
     @Override
-    public void send(String topic, T payload) {
+    public void send(T payload, Map<String, String> properties) throws 
MessagingException {
         try {
             ObjectWriter writer = mapper.writerFor(payload.getClass());
             String payloadSt = writer.writeValueAsString(payload);
@@ -65,4 +77,5 @@ public class KafkaJsonMessageSender<T> implements 
JsonMessageSender<T> {
             throw new MessagingException(format("Failed to send JSON message 
on topic %s", topic), e);
         }
     }
+
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
index 596ecc5..542a152 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
@@ -18,6 +18,9 @@
  */
 package org.apache.sling.distribution.journal.kafka;
 
+import java.util.Collections;
+import java.util.Map;
+
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.sling.distribution.journal.MessageInfo;
 
@@ -27,8 +30,14 @@ public class KafkaMessageInfo implements MessageInfo {
     private final int partition;
     private final long offset;
     private final long createTime;
+    private final Map<String, String> props;
 
     public KafkaMessageInfo(ConsumerRecord<String, ?> record) {
+        this(record, Collections.emptyMap());
+    }
+    
+    public KafkaMessageInfo(ConsumerRecord<String, ?> record, Map<String, 
String> props) {
+        this.props = props;
         this.topic = record.topic();
         this.partition = record.partition();
         this.offset = record.offset();
@@ -52,8 +61,14 @@ public class KafkaMessageInfo implements MessageInfo {
     }
 
     @Override
+    public Map<String, String> getProps() {
+        return props;
+    }
+
+    @Override
     public String toString() {
         return String.format("Topic: %s, Partition: %d, Offset: %d, 
CreateTime: %d", 
                 topic, partition, offset, createTime);
     }
+
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
deleted file mode 100644
index 70e5a6a..0000000
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.kafka;
-
-import java.util.Arrays;
-
-import org.apache.sling.distribution.journal.messages.Types;
-import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.MessageSender;
-import org.apache.sling.distribution.journal.MessagingException;
-import com.google.protobuf.GeneratedMessage;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.internals.RecordHeader;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static java.lang.String.format;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static java.util.Objects.requireNonNull;
-import static 
org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION;
-
-public class KafkaMessageSender<T extends GeneratedMessage> implements 
MessageSender<T> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaMessageSender.class);
-
-    private final KafkaProducer<String, byte[]> producer;
-
-    private final ExceptionEventSender eventSender;
-
-    public KafkaMessageSender(KafkaProducer<String, byte[]> producer, 
ExceptionEventSender eventSender) {
-        this.eventSender = requireNonNull(eventSender);
-        this.producer = requireNonNull(producer);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void send(String topic, T payload) {
-        Integer type = Types.getType(payload.getClass());
-        if (type == null) {
-            throw new IllegalArgumentException("No mapping for type " + 
payload.getClass().getName());
-        }
-        int version = Types.getVersion(payload.getClass());
-        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, 
PARTITION, null, null, payload.toByteArray(), toHeaders(type, version));
-        try {
-            RecordMetadata metadata = producer.send(record).get();
-            LOG.debug("Sent to {}", metadata);
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new MessagingException(format("Interrupted while trying to 
send to topic %s", topic), e);
-        } catch (Exception e) {
-            handleException(topic, e);
-        }
-    }
-
-    private void handleException(String topic, Exception e) {
-        eventSender.send(e);
-        throw new MessagingException(format("Failed to send message on topic 
%s", topic), e);
-    }
-
-    private Iterable<Header> toHeaders(int type, int version) {
-        return Arrays.asList(toHeader("type", type),
-                toHeader("version",version));
-    }
-
-    private Header toHeader(String key, int value) {
-        return new RecordHeader(key, Integer.toString(value).getBytes(UTF_8));
-    }
-}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
index e4d20c4..54b8a73 100644
--- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java
@@ -31,7 +31,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.sling.distribution.journal.ExceptionEventSender;
 import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,12 +58,10 @@ public class KafkaPoller<T> implements Closeable {
         startBackgroundThread(this::run, "Message Poller");
     }
     
-    public static Closeable createProtobufPoller(KafkaConsumer<String, byte[]> 
consumer, ExceptionEventSender eventSender, HandlerAdapter<?>... adapters) {
-        return new KafkaPoller<>(consumer, eventSender, new 
ProtobufRecordHandler(adapters));
-    }
-    
-    public static <T>  Closeable createJsonPoller(KafkaConsumer<String, 
String> consumer, ExceptionEventSender eventSender, MessageHandler<T> handler, 
Class<T> clazz) {
-        return new KafkaPoller<>(consumer, eventSender, new 
JsonRecordHandler<T>(handler, clazz));
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    public static Closeable createJsonPoller(KafkaConsumer<String, String> 
consumer, ExceptionEventSender eventSender, HandlerAdapter<?> ...adapters) {
+        HandlerAdapter<?> adapter = adapters[0];
+        return new KafkaPoller<>(consumer, eventSender, new 
JsonRecordHandler(adapter.getHandler(), adapter.getType()));
     }
 
     @Override
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
deleted file mode 100644
index 4435bb4..0000000
--- 
a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.kafka;
-
-import static java.lang.Integer.parseInt;
-import static java.lang.String.format;
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Consumer;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.header.Header;
-import org.apache.kafka.common.header.Headers;
-import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingException;
-import org.apache.sling.distribution.journal.messages.Types;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.protobuf.ByteString;
-
-public class ProtobufRecordHandler implements Consumer<ConsumerRecord<String, 
byte[]>> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(ProtobufRecordHandler.class);
-
-    private final Map<Class<?>, HandlerAdapter<?>> handlers = new HashMap<>();
-
-    public ProtobufRecordHandler(HandlerAdapter<?>... handlerAdapters) {
-        for (HandlerAdapter<?> handlerAdapter : handlerAdapters) {
-            handlers.put(handlerAdapter.getType(), handlerAdapter);
-        }
-    }
-
-    @Override
-    public void accept(ConsumerRecord<String, byte[]> record) {
-        getHandler(record)
-            .ifPresent(handler->handleRecord(handler, record));
-    }
-
-    private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, 
byte[]> record) {
-        int type = parseInt(getHeaderValue(record.headers(), "type"));
-        int version = parseInt(getHeaderValue(record.headers(), "version"));
-        Class<?> messageClass = Types.getType(type, version);
-        Optional<HandlerAdapter<?>> handler = 
Optional.ofNullable(handlers.get(messageClass));
-        if (!handler.isPresent()) {
-            LOG.debug("No handler registered for type {}", 
messageClass.getName());
-        }
-        return handler;
-    }
-
-    private void handleRecord(HandlerAdapter<?> handler, 
ConsumerRecord<String, byte[]> record) {
-        MessageInfo info = new KafkaMessageInfo(record);
-        ByteString payload = ByteString.copyFrom(record.value());
-        try {
-            handler.handle(info, payload);
-        } catch (Exception e) {
-            throw new MessagingException(e.getMessage(), e);
-        }
-    }
-
-    private String getHeaderValue(Headers headers, String key) {
-        Header header = Optional.ofNullable(headers.lastHeader(key))
-            .orElseThrow(()->new IllegalArgumentException(format("Header with 
key %s not found", key)));
-        return new String(header.value(), UTF_8);
-    }
-}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
deleted file mode 100644
index 3a81f7c..0000000
--- 
a/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.kafka;
-
-import static org.hamcrest.Matchers.samePropertyValuesAs;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.sling.distribution.journal.JsonMessageSender;
-import org.apache.sling.distribution.journal.MessageInfo;
-import org.apache.sling.distribution.journal.MessagingProvider;
-import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.kafka.util.KafkaRule;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.mockito.MockitoAnnotations;
-
-/**
- * Non OSGi test for the interaction of JsonMessageSender, JsonMessagePoller
- */
-public class JsonMessagingTest {
-
-    public static class Person {
-        public String name;
-    }
-
-    private static final String TOPIC_NAME = "test";
-    
-    private Semaphore sem = new Semaphore(0);
-    private Person lastMessage;
-    
-    @ClassRule
-    public static KafkaRule kafka = new KafkaRule();
-    
-    @Before
-    public void before() {
-        MockitoAnnotations.initMocks(this);
-    }
-    
-    @Test
-    public void testSendReceive() throws InterruptedException, IOException, 
IllegalArgumentException, IllegalAccessException, NoSuchFieldException, 
SecurityException {
-        MessagingProvider provider = kafka.getProvider();
-        Person msg = new Person();
-        msg.name = "Joe";
-        Closeable poller = provider.createJsonPoller(TOPIC_NAME, 
Reset.earliest, this::handle, Person.class);
-        JsonMessageSender<Person> messageSender = provider.createJsonSender();
-        
-        messageSender.send(TOPIC_NAME, msg);
-        assertReceived();
-        assertThat(this.lastMessage, samePropertyValuesAs(msg));
-        poller.close();
-    }
-    
-    @Test
-    public void testParseError() throws InterruptedException, IOException, 
IllegalArgumentException, IllegalAccessException, NoSuchFieldException, 
SecurityException {
-        MessagingProvider provider = kafka.getProvider();
-        Closeable poller = provider.createJsonPoller(TOPIC_NAME, 
Reset.earliest, this::handle, Person.class);
-        JsonMessageSender<String> messageSender = provider.createJsonSender();
-        
-        messageSender.send(TOPIC_NAME, "broken");
-        // Log should display "Failed to parse payload"
-        assertNotReceived();
-        poller.close();
-    }
-    
-    private void assertReceived() throws InterruptedException {
-        assertTrue(sem.tryAcquire(30, TimeUnit.SECONDS));
-    }
-    
-    private void assertNotReceived() throws InterruptedException {
-        assertFalse(sem.tryAcquire(1, TimeUnit.SECONDS));
-    }
-    
-    private void handle(MessageInfo info, Person message) {
-        this.lastMessage = message;
-        this.sem.release();
-    }
-    
-}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java
index 27926bf..49704be 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java
@@ -38,8 +38,6 @@ import org.mockito.runners.MockitoJUnitRunner;
 @RunWith(MockitoJUnitRunner.class)
 public class KafkaJsonMessageSenderTest {
 
-    private static final String TOPIC = "topic";
-
     @Mock
     private ExceptionEventSender eventSender;
     
@@ -58,7 +56,7 @@ public class KafkaJsonMessageSenderTest {
         when(record.get()).thenThrow(new ExecutionException(new 
IOException("Expected")));
         Person person = new Person();
         person.name = "name";
-        sender.send(TOPIC, person);
+        sender.send(person);
     }
     
     public static class Person {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java
deleted file mode 100644
index 5746269..0000000
--- 
a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.kafka;
-
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.MessagingException;
-import org.apache.sling.distribution.journal.messages.Messages.ClearCommand;
-import org.apache.sling.distribution.journal.messages.Messages.PingMessage;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import com.google.protobuf.GeneratedMessage;
-
-@RunWith(MockitoJUnitRunner.class)
-public class KafkaMessageSenderTest {
-
-    private static final String TOPIC = "topic";
-
-    @Mock
-    private ExceptionEventSender eventSender;
-    
-    @Mock
-    private KafkaProducer<String, byte[]> producer;
-    
-    @InjectMocks
-    private KafkaMessageSender<GeneratedMessage> sender;
-
-    @Mock
-    private Future<RecordMetadata> record;
-    
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoMapping() throws Exception {
-        GeneratedMessage payload = 
ClearCommand.newBuilder().setOffset(0l).build();
-        sender.send(TOPIC, payload);
-    }
-    
-    @Test(expected = MessagingException.class)
-    public void testSendError() throws Exception {
-        when(producer.send(Mockito.any())).thenReturn(record);
-        when(record.get()).thenThrow(new ExecutionException(new 
IOException("Expected")));
-        GeneratedMessage payload = PingMessage.newBuilder().build();
-        sender.send(TOPIC, payload);
-    }
-}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java 
b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
index 672b751..0989667 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -34,9 +34,9 @@ import org.apache.sling.distribution.journal.MessageSender;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.kafka.util.KafkaRule;
-import org.apache.sling.distribution.journal.messages.Messages.CommandMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import 
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.apache.sling.distribution.journal.messages.ClearCommand;
+import org.apache.sling.distribution.journal.messages.DiscoveryMessage;
+import org.apache.sling.distribution.journal.messages.SubscriberConfig;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -63,26 +63,25 @@ public class MessagingTest {
     
     @Test
     public void testSendReceive() throws Exception {
-        Closeable poller = provider.createPoller(topicName, Reset.earliest, 
handler);
-        MessageSender<DiscoveryMessage> messageSender = 
provider.createSender();
+        try (Closeable poller = provider.createPoller(topicName, 
Reset.earliest, provider.assignTo(0), handler)) {
+            MessageSender<DiscoveryMessage> messageSender = 
provider.createSender(topicName);
         
-        messageSender.send(topicName, createMessage());
-        assertReceived("Consumer started from earliest .. should see our 
message");
-        messageSender.send(topicName, createMessage());
-        assertReceived("Should also consume a second message");
-
-        poller.close();
+            messageSender.send(createMessage());
+            assertReceived("Consumer started from earliest .. should see our 
message");
+            messageSender.send(createMessage());
+            assertReceived("Should also consume a second message");
+        }
     }
     
     @Test
     public void testNoHandler() throws Exception {
         try (Closeable poller = provider.createPoller(topicName, 
Reset.earliest, handler)) {
-            MessageSender<CommandMessage> messageSender = 
provider.createSender();
-            CommandMessage msg = CommandMessage.newBuilder()
-                .setSubSlingId("subslingid")
-                .setSubAgentName("agentname")
+            MessageSender<ClearCommand> messageSender = 
provider.createSender(topicName);
+            ClearCommand msg = ClearCommand.builder()
+                .subSlingId("subslingid")
+                .subAgentName("agentname")
                 .build();
-            messageSender.send(topicName, msg);
+            messageSender.send(msg);
             assertNotReceived("Should not be received as we have no handler");
         }
     }
@@ -90,39 +89,38 @@ public class MessagingTest {
     @Test
     public void testAssign() throws Exception {
         DiscoveryMessage msg = createMessage();
-        MessageSender<DiscoveryMessage> messageSender = 
provider.createSender();
-        messageSender.send(topicName, msg);
+        MessageSender<DiscoveryMessage> messageSender = 
provider.createSender(topicName);
+        messageSender.send(msg);
         
         try (Closeable poller = provider.createPoller(topicName, 
Reset.earliest, handler)) {
             assertReceived("Starting from earliest .. should see our message");
         }
         long offset = lastInfo.getOffset();
         
-        String assign = "0:" + offset;
+        String assign = provider.assignTo(offset);
         try (Closeable poller = provider.createPoller(topicName, Reset.latest, 
assign, handler)) {
             assertReceived("Starting from old offset .. should see our 
message");
             assertThat(lastInfo.getOffset(), equalTo(offset));
         }
         
-        String invalid = "0:32532523453";
+        String invalid = provider.assignTo(32532523453l);
         try (Closeable poller1 = provider.createPoller(topicName, 
Reset.latest, invalid, handler)) {
-            assertNotReceived("Should see old message as we start from 
earliest");
+            assertNotReceived("Should not see message as we fall back to 
latest");
         }
         
-        String invalid1 = "0:32532523453";
-        try (Closeable poller2 = provider.createPoller(topicName, 
Reset.earliest, invalid1, handler)) {
-            assertReceived("Should not see any message as we start from 
latest");
+        try (Closeable poller2 = provider.createPoller(topicName, 
Reset.earliest, invalid, handler)) {
+            assertReceived("Should see message as we fall back to earliest");
         }
     }
 
     private DiscoveryMessage createMessage() {
-        return DiscoveryMessage.newBuilder()
-                .setSubAgentName("sub1agent")
-                .setSubSlingId("subsling")
-                .setSubscriberConfiguration(SubscriberConfiguration
-                        .newBuilder()
-                        .setEditable(false)
-                        .setMaxRetries(-1)
+        return DiscoveryMessage.builder()
+                .subAgentName("sub1agent")
+                .subSlingId("subsling")
+                .subscriberConfiguration(SubscriberConfig
+                        .builder()
+                        .editable(false)
+                        .maxRetries(-1)
                         .build())
                 .build();
     }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java
deleted file mode 100644
index 3c76ea7..0000000
--- 
a/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.distribution.journal.kafka;
-
-import static org.apache.sling.distribution.journal.HandlerAdapter.create;
-
-import java.io.IOException;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.sling.distribution.journal.ExceptionEventSender;
-import org.apache.sling.distribution.journal.MessageInfo;
-import 
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class ProtobufRecordHandlerTest {
-    @Mock
-    ExceptionEventSender eventSender;
-    
-    @Mock
-    KafkaConsumer<String, byte[]> consumer;
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testNoHeader() throws IOException, InterruptedException {
-        ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, 
byte[]>("topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0, 0, 0, "key", null);
-        ProtobufRecordHandler handler = new 
ProtobufRecordHandler(create(DiscoveryMessage.class, this::handle));
-        handler.accept(record);
-    }
-
-    private void handle(MessageInfo info, DiscoveryMessage message) {
-    }
-}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java
 
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java
index 0d235b3..e1a92ed 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java
@@ -76,6 +76,7 @@ public class KafkaLocal implements Closeable {
         kafkaProps.put("log.dir",logDir);
         kafkaProps.put("group.initial.rebalance.delay.ms", "0");
         kafkaProps.put("group.min.session.timeout.ms", "1000");
+        kafkaProps.put("offsets.topic.replication.factor", "1");
         return kafkaProps;
     }
 

Reply via email to