This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-9504 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git
commit d7008b13367ae882a8df4b77ff78df2324289eca Author: Christian Schneider <[email protected]> AuthorDate: Mon Jun 15 18:40:58 2020 +0200 SLING-9504 - Switch from protobuf to json --- pom.xml | 5 +- .../journal/kafka/KafkaClientProvider.java | 56 ++--------- .../journal/kafka/KafkaJsonMessageSender.java | 21 ++++- .../journal/kafka/KafkaMessageInfo.java | 15 +++ .../journal/kafka/KafkaMessageSender.java | 89 ------------------ .../distribution/journal/kafka/KafkaPoller.java | 11 +-- .../journal/kafka/ProtobufRecordHandler.java | 86 ----------------- .../journal/kafka/JsonMessagingTest.java | 102 --------------------- .../journal/kafka/KafkaJsonMessageSenderTest.java | 4 +- .../journal/kafka/KafkaMessageSenderTest.java | 73 --------------- .../distribution/journal/kafka/MessagingTest.java | 60 ++++++------ .../journal/kafka/ProtobufRecordHandlerTest.java | 53 ----------- .../journal/kafka/util/KafkaLocal.java | 1 + 13 files changed, 75 insertions(+), 501 deletions(-) diff --git a/pom.xml b/pom.xml index 015966e..112e52f 100644 --- a/pom.xml +++ b/pom.xml @@ -34,7 +34,7 @@ <!-- P R O J E C T --> <!-- ======================================================================= --> <artifactId>org.apache.sling.distribution.journal.kafka</artifactId> - <version>0.1.5-SNAPSHOT</version> + <version>0.1.5-JSON-SNAPSHOT</version> <name>Apache Sling Journal Messaging based on Apache Kafka</name> <description>Implementation of Apache Sling Content Distribution Journal Messaging based on Apache Kafka.</description> @@ -70,13 +70,12 @@ <dependency> <groupId>org.apache.sling</groupId> <artifactId>org.apache.sling.distribution.journal.messages</artifactId> - <version>0.1.2</version> + <version>0.1.7-JSON-SNAPSHOT</version> </dependency> <!-- OSGi --> <dependency> <groupId>org.osgi</groupId> <artifactId>org.osgi.service.component.annotations</artifactId> - <version>1.3.0</version> <scope>provided</scope> </dependency> <dependency> diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java index 199e7fe..2a4391d 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java @@ -52,15 +52,11 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; -import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.sling.distribution.journal.ExceptionEventSender; import org.apache.sling.distribution.journal.HandlerAdapter; -import org.apache.sling.distribution.journal.JsonMessageSender; -import org.apache.sling.distribution.journal.MessageHandler; import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingException; import org.apache.sling.distribution.journal.MessagingProvider; @@ -75,13 +71,11 @@ import org.osgi.service.metatype.annotations.Designate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.protobuf.GeneratedMessage; - @Component(service = MessagingProvider.class, configurationPolicy = ConfigurationPolicy.REQUIRE) @Designate(ocd = KafkaEndpoint.class) public class KafkaClientProvider implements MessagingProvider, Closeable { - private static final Logger LOG = LoggerFactory.getLogger(KafkaClientProvider.class); + private final Logger log = LoggerFactory.getLogger(this.getClass()); public static final int PARTITION = 0; @@ -124,18 +118,14 @@ public class KafkaClientProvider implements MessagingProvider, Closeable { } @Override - public <T extends GeneratedMessage> MessageSender<T> createSender() { - return new KafkaMessageSender<>(buildKafkaProducer(), eventSender); - } - - @Override - public <T> Closeable createPoller(String topicName, Reset reset, HandlerAdapter<?>... adapters) { - return createPoller(topicName, reset, null, adapters); + public <T> MessageSender<T> createSender(String topic) { + return new KafkaJsonMessageSender<>(buildJsonKafkaProducer(), topic, eventSender); } @Override public Closeable createPoller(String topicName, Reset reset, @Nullable String assign, HandlerAdapter<?>... adapters) { - KafkaConsumer<String, byte[]> consumer = createConsumer(ByteArrayDeserializer.class, reset); + log.info("Creating poller for topic={}, reset={}, assing={} with adapters {}.", topicName, reset, assign, adapters); + KafkaConsumer<String, String> consumer = createConsumer(StringDeserializer.class, reset); TopicPartition topicPartition = new TopicPartition(topicName, PARTITION); Collection<TopicPartition> topicPartitions = singleton(topicPartition); consumer.assign(topicPartitions); @@ -146,28 +136,7 @@ public class KafkaClientProvider implements MessagingProvider, Closeable { } else { consumer.seekToEnd(topicPartitions); } - Closeable poller = KafkaPoller.createProtobufPoller(consumer, eventSender, adapters); - LOG.info("Created poller for reset {}, topicName {}, assign {}", reset, topicName, assign); - return poller; - } - - @Override - public <T> JsonMessageSender<T> createJsonSender() { - return new KafkaJsonMessageSender<>(buildJsonKafkaProducer(), eventSender); - } - - @Override - public <T> Closeable createJsonPoller(String topicName, Reset reset, MessageHandler<T> handler, Class<T> type) { - KafkaConsumer<String, String> consumer = createConsumer(StringDeserializer.class, reset); - TopicPartition topicPartition = new TopicPartition(topicName, PARTITION); - Collection<TopicPartition> topicPartitions = singleton(topicPartition); - consumer.assign(topicPartitions); - if (reset == Reset.earliest) { - consumer.seekToBeginning(topicPartitions); - } else { - consumer.seekToEnd(topicPartitions); - } - return KafkaPoller.createJsonPoller(consumer, eventSender, handler, type); + return KafkaPoller.createJsonPoller(consumer, eventSender, adapters); } @Override @@ -223,17 +192,6 @@ public class KafkaClientProvider implements MessagingProvider, Closeable { } @Nonnull - private synchronized KafkaProducer<String, byte[]> buildKafkaProducer() { - if (rawProducer == null) { - try (CLSwitch switcher = new CLSwitch(KafkaProducer.class.getClassLoader())) { - rawProducer = new KafkaProducer<>(producerConfig(ByteArraySerializer.class)); - } - rawProducer = new KafkaProducer<>(producerConfig(ByteArraySerializer.class)); - } - return rawProducer; - } - - @Nonnull private synchronized KafkaProducer<String, String> buildJsonKafkaProducer() { if (jsonProducer == null) { jsonProducer = new KafkaProducer<>(producerConfig(StringSerializer.class)); @@ -277,6 +235,4 @@ public class KafkaClientProvider implements MessagingProvider, Closeable { } return Long.parseLong(chunks[1]); } - - } diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java index 9951a79..ade6833 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java @@ -22,11 +22,14 @@ import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION; +import java.util.Collections; +import java.util.Map; + import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.sling.distribution.journal.ExceptionEventSender; -import org.apache.sling.distribution.journal.JsonMessageSender; +import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +37,7 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; -public class KafkaJsonMessageSender<T> implements JsonMessageSender<T> { +public class KafkaJsonMessageSender<T> implements MessageSender<T> { private static final Logger LOG = LoggerFactory.getLogger(KafkaJsonMessageSender.class); @@ -44,16 +47,25 @@ public class KafkaJsonMessageSender<T> implements JsonMessageSender<T> { private final ExceptionEventSender eventSender; - public KafkaJsonMessageSender(KafkaProducer<String, String> producer, ExceptionEventSender eventSender) { + private final String topic; + + public KafkaJsonMessageSender(KafkaProducer<String, String> producer, String topic, ExceptionEventSender eventSender) { + this.topic = topic; this.eventSender = eventSender; this.producer = requireNonNull(producer); } + + @Override + public void send(T payload) throws MessagingException { + send(payload, Collections.emptyMap()); + } + /** * {@inheritDoc} */ @Override - public void send(String topic, T payload) { + public void send(T payload, Map<String, String> properties) throws MessagingException { try { ObjectWriter writer = mapper.writerFor(payload.getClass()); String payloadSt = writer.writeValueAsString(payload); @@ -65,4 +77,5 @@ public class KafkaJsonMessageSender<T> implements JsonMessageSender<T> { throw new MessagingException(format("Failed to send JSON message on topic %s", topic), e); } } + } diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java index 596ecc5..542a152 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java @@ -18,6 +18,9 @@ */ package org.apache.sling.distribution.journal.kafka; +import java.util.Collections; +import java.util.Map; + import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.sling.distribution.journal.MessageInfo; @@ -27,8 +30,14 @@ public class KafkaMessageInfo implements MessageInfo { private final int partition; private final long offset; private final long createTime; + private final Map<String, String> props; public KafkaMessageInfo(ConsumerRecord<String, ?> record) { + this(record, Collections.emptyMap()); + } + + public KafkaMessageInfo(ConsumerRecord<String, ?> record, Map<String, String> props) { + this.props = props; this.topic = record.topic(); this.partition = record.partition(); this.offset = record.offset(); @@ -52,8 +61,14 @@ public class KafkaMessageInfo implements MessageInfo { } @Override + public Map<String, String> getProps() { + return props; + } + + @Override public String toString() { return String.format("Topic: %s, Partition: %d, Offset: %d, CreateTime: %d", topic, partition, offset, createTime); } + } diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java deleted file mode 100644 index 70e5a6a..0000000 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.kafka; - -import java.util.Arrays; - -import org.apache.sling.distribution.journal.messages.Types; -import org.apache.sling.distribution.journal.ExceptionEventSender; -import org.apache.sling.distribution.journal.MessageSender; -import org.apache.sling.distribution.journal.MessagingException; -import com.google.protobuf.GeneratedMessage; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.internals.RecordHeader; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static java.lang.String.format; -import static java.nio.charset.StandardCharsets.UTF_8; -import static java.util.Objects.requireNonNull; -import static org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION; - -public class KafkaMessageSender<T extends GeneratedMessage> implements MessageSender<T> { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageSender.class); - - private final KafkaProducer<String, byte[]> producer; - - private final ExceptionEventSender eventSender; - - public KafkaMessageSender(KafkaProducer<String, byte[]> producer, ExceptionEventSender eventSender) { - this.eventSender = requireNonNull(eventSender); - this.producer = requireNonNull(producer); - } - - /** - * {@inheritDoc} - */ - @Override - public void send(String topic, T payload) { - Integer type = Types.getType(payload.getClass()); - if (type == null) { - throw new IllegalArgumentException("No mapping for type " + payload.getClass().getName()); - } - int version = Types.getVersion(payload.getClass()); - ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, PARTITION, null, null, payload.toByteArray(), toHeaders(type, version)); - try { - RecordMetadata metadata = producer.send(record).get(); - LOG.debug("Sent to {}", metadata); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(format("Interrupted while trying to send to topic %s", topic), e); - } catch (Exception e) { - handleException(topic, e); - } - } - - private void handleException(String topic, Exception e) { - eventSender.send(e); - throw new MessagingException(format("Failed to send message on topic %s", topic), e); - } - - private Iterable<Header> toHeaders(int type, int version) { - return Arrays.asList(toHeader("type", type), - toHeader("version",version)); - } - - private Header toHeader(String key, int value) { - return new RecordHeader(key, Integer.toString(value).getBytes(UTF_8)); - } -} diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java index e4d20c4..54b8a73 100644 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java +++ b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaPoller.java @@ -31,7 +31,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.errors.WakeupException; import org.apache.sling.distribution.journal.ExceptionEventSender; import org.apache.sling.distribution.journal.HandlerAdapter; -import org.apache.sling.distribution.journal.MessageHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,12 +58,10 @@ public class KafkaPoller<T> implements Closeable { startBackgroundThread(this::run, "Message Poller"); } - public static Closeable createProtobufPoller(KafkaConsumer<String, byte[]> consumer, ExceptionEventSender eventSender, HandlerAdapter<?>... adapters) { - return new KafkaPoller<>(consumer, eventSender, new ProtobufRecordHandler(adapters)); - } - - public static <T> Closeable createJsonPoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, MessageHandler<T> handler, Class<T> clazz) { - return new KafkaPoller<>(consumer, eventSender, new JsonRecordHandler<T>(handler, clazz)); + @SuppressWarnings({ "rawtypes", "unchecked" }) + public static Closeable createJsonPoller(KafkaConsumer<String, String> consumer, ExceptionEventSender eventSender, HandlerAdapter<?> ...adapters) { + HandlerAdapter<?> adapter = adapters[0]; + return new KafkaPoller<>(consumer, eventSender, new JsonRecordHandler(adapter.getHandler(), adapter.getType())); } @Override diff --git a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java b/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java deleted file mode 100644 index 4435bb4..0000000 --- a/src/main/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandler.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.kafka; - -import static java.lang.Integer.parseInt; -import static java.lang.String.format; -import static java.nio.charset.StandardCharsets.UTF_8; - -import java.util.HashMap; -import java.util.Map; -import java.util.Optional; -import java.util.function.Consumer; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -import org.apache.sling.distribution.journal.HandlerAdapter; -import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.MessagingException; -import org.apache.sling.distribution.journal.messages.Types; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.protobuf.ByteString; - -public class ProtobufRecordHandler implements Consumer<ConsumerRecord<String, byte[]>> { - - private static final Logger LOG = LoggerFactory.getLogger(ProtobufRecordHandler.class); - - private final Map<Class<?>, HandlerAdapter<?>> handlers = new HashMap<>(); - - public ProtobufRecordHandler(HandlerAdapter<?>... handlerAdapters) { - for (HandlerAdapter<?> handlerAdapter : handlerAdapters) { - handlers.put(handlerAdapter.getType(), handlerAdapter); - } - } - - @Override - public void accept(ConsumerRecord<String, byte[]> record) { - getHandler(record) - .ifPresent(handler->handleRecord(handler, record)); - } - - private Optional<HandlerAdapter<?>> getHandler(ConsumerRecord<String, byte[]> record) { - int type = parseInt(getHeaderValue(record.headers(), "type")); - int version = parseInt(getHeaderValue(record.headers(), "version")); - Class<?> messageClass = Types.getType(type, version); - Optional<HandlerAdapter<?>> handler = Optional.ofNullable(handlers.get(messageClass)); - if (!handler.isPresent()) { - LOG.debug("No handler registered for type {}", messageClass.getName()); - } - return handler; - } - - private void handleRecord(HandlerAdapter<?> handler, ConsumerRecord<String, byte[]> record) { - MessageInfo info = new KafkaMessageInfo(record); - ByteString payload = ByteString.copyFrom(record.value()); - try { - handler.handle(info, payload); - } catch (Exception e) { - throw new MessagingException(e.getMessage(), e); - } - } - - private String getHeaderValue(Headers headers, String key) { - Header header = Optional.ofNullable(headers.lastHeader(key)) - .orElseThrow(()->new IllegalArgumentException(format("Header with key %s not found", key))); - return new String(header.value(), UTF_8); - } -} diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java deleted file mode 100644 index 3a81f7c..0000000 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.kafka; - -import static org.hamcrest.Matchers.samePropertyValuesAs; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import org.apache.sling.distribution.journal.JsonMessageSender; -import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.MessagingProvider; -import org.apache.sling.distribution.journal.Reset; -import org.apache.sling.distribution.journal.kafka.util.KafkaRule; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Test; -import org.mockito.MockitoAnnotations; - -/** - * Non OSGi test for the interaction of JsonMessageSender, JsonMessagePoller - */ -public class JsonMessagingTest { - - public static class Person { - public String name; - } - - private static final String TOPIC_NAME = "test"; - - private Semaphore sem = new Semaphore(0); - private Person lastMessage; - - @ClassRule - public static KafkaRule kafka = new KafkaRule(); - - @Before - public void before() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testSendReceive() throws InterruptedException, IOException, IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException { - MessagingProvider provider = kafka.getProvider(); - Person msg = new Person(); - msg.name = "Joe"; - Closeable poller = provider.createJsonPoller(TOPIC_NAME, Reset.earliest, this::handle, Person.class); - JsonMessageSender<Person> messageSender = provider.createJsonSender(); - - messageSender.send(TOPIC_NAME, msg); - assertReceived(); - assertThat(this.lastMessage, samePropertyValuesAs(msg)); - poller.close(); - } - - @Test - public void testParseError() throws InterruptedException, IOException, IllegalArgumentException, IllegalAccessException, NoSuchFieldException, SecurityException { - MessagingProvider provider = kafka.getProvider(); - Closeable poller = provider.createJsonPoller(TOPIC_NAME, Reset.earliest, this::handle, Person.class); - JsonMessageSender<String> messageSender = provider.createJsonSender(); - - messageSender.send(TOPIC_NAME, "broken"); - // Log should display "Failed to parse payload" - assertNotReceived(); - poller.close(); - } - - private void assertReceived() throws InterruptedException { - assertTrue(sem.tryAcquire(30, TimeUnit.SECONDS)); - } - - private void assertNotReceived() throws InterruptedException { - assertFalse(sem.tryAcquire(1, TimeUnit.SECONDS)); - } - - private void handle(MessageInfo info, Person message) { - this.lastMessage = message; - this.sem.release(); - } - -} diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java index 27926bf..49704be 100644 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSenderTest.java @@ -38,8 +38,6 @@ import org.mockito.runners.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) public class KafkaJsonMessageSenderTest { - private static final String TOPIC = "topic"; - @Mock private ExceptionEventSender eventSender; @@ -58,7 +56,7 @@ public class KafkaJsonMessageSenderTest { when(record.get()).thenThrow(new ExecutionException(new IOException("Expected"))); Person person = new Person(); person.name = "name"; - sender.send(TOPIC, person); + sender.send(person); } public static class Person { diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java deleted file mode 100644 index 5746269..0000000 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSenderTest.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.kafka; - -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.sling.distribution.journal.ExceptionEventSender; -import org.apache.sling.distribution.journal.MessagingException; -import org.apache.sling.distribution.journal.messages.Messages.ClearCommand; -import org.apache.sling.distribution.journal.messages.Messages.PingMessage; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.runners.MockitoJUnitRunner; - -import com.google.protobuf.GeneratedMessage; - -@RunWith(MockitoJUnitRunner.class) -public class KafkaMessageSenderTest { - - private static final String TOPIC = "topic"; - - @Mock - private ExceptionEventSender eventSender; - - @Mock - private KafkaProducer<String, byte[]> producer; - - @InjectMocks - private KafkaMessageSender<GeneratedMessage> sender; - - @Mock - private Future<RecordMetadata> record; - - - @Test(expected = IllegalArgumentException.class) - public void testNoMapping() throws Exception { - GeneratedMessage payload = ClearCommand.newBuilder().setOffset(0l).build(); - sender.send(TOPIC, payload); - } - - @Test(expected = MessagingException.class) - public void testSendError() throws Exception { - when(producer.send(Mockito.any())).thenReturn(record); - when(record.get()).thenThrow(new ExecutionException(new IOException("Expected"))); - GeneratedMessage payload = PingMessage.newBuilder().build(); - sender.send(TOPIC, payload); - } -} diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java index 672b751..0989667 100644 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java @@ -34,9 +34,9 @@ import org.apache.sling.distribution.journal.MessageSender; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.kafka.util.KafkaRule; -import org.apache.sling.distribution.journal.messages.Messages.CommandMessage; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; -import org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration; +import org.apache.sling.distribution.journal.messages.ClearCommand; +import org.apache.sling.distribution.journal.messages.DiscoveryMessage; +import org.apache.sling.distribution.journal.messages.SubscriberConfig; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -63,26 +63,25 @@ public class MessagingTest { @Test public void testSendReceive() throws Exception { - Closeable poller = provider.createPoller(topicName, Reset.earliest, handler); - MessageSender<DiscoveryMessage> messageSender = provider.createSender(); + try (Closeable poller = provider.createPoller(topicName, Reset.earliest, provider.assignTo(0), handler)) { + MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName); - messageSender.send(topicName, createMessage()); - assertReceived("Consumer started from earliest .. should see our message"); - messageSender.send(topicName, createMessage()); - assertReceived("Should also consume a second message"); - - poller.close(); + messageSender.send(createMessage()); + assertReceived("Consumer started from earliest .. should see our message"); + messageSender.send(createMessage()); + assertReceived("Should also consume a second message"); + } } @Test public void testNoHandler() throws Exception { try (Closeable poller = provider.createPoller(topicName, Reset.earliest, handler)) { - MessageSender<CommandMessage> messageSender = provider.createSender(); - CommandMessage msg = CommandMessage.newBuilder() - .setSubSlingId("subslingid") - .setSubAgentName("agentname") + MessageSender<ClearCommand> messageSender = provider.createSender(topicName); + ClearCommand msg = ClearCommand.builder() + .subSlingId("subslingid") + .subAgentName("agentname") .build(); - messageSender.send(topicName, msg); + messageSender.send(msg); assertNotReceived("Should not be received as we have no handler"); } } @@ -90,39 +89,38 @@ public class MessagingTest { @Test public void testAssign() throws Exception { DiscoveryMessage msg = createMessage(); - MessageSender<DiscoveryMessage> messageSender = provider.createSender(); - messageSender.send(topicName, msg); + MessageSender<DiscoveryMessage> messageSender = provider.createSender(topicName); + messageSender.send(msg); try (Closeable poller = provider.createPoller(topicName, Reset.earliest, handler)) { assertReceived("Starting from earliest .. should see our message"); } long offset = lastInfo.getOffset(); - String assign = "0:" + offset; + String assign = provider.assignTo(offset); try (Closeable poller = provider.createPoller(topicName, Reset.latest, assign, handler)) { assertReceived("Starting from old offset .. should see our message"); assertThat(lastInfo.getOffset(), equalTo(offset)); } - String invalid = "0:32532523453"; + String invalid = provider.assignTo(32532523453l); try (Closeable poller1 = provider.createPoller(topicName, Reset.latest, invalid, handler)) { - assertNotReceived("Should see old message as we start from earliest"); + assertNotReceived("Should not see message as we fall back to latest"); } - String invalid1 = "0:32532523453"; - try (Closeable poller2 = provider.createPoller(topicName, Reset.earliest, invalid1, handler)) { - assertReceived("Should not see any message as we start from latest"); + try (Closeable poller2 = provider.createPoller(topicName, Reset.earliest, invalid, handler)) { + assertReceived("Should see message as we fall back to earliest"); } } private DiscoveryMessage createMessage() { - return DiscoveryMessage.newBuilder() - .setSubAgentName("sub1agent") - .setSubSlingId("subsling") - .setSubscriberConfiguration(SubscriberConfiguration - .newBuilder() - .setEditable(false) - .setMaxRetries(-1) + return DiscoveryMessage.builder() + .subAgentName("sub1agent") + .subSlingId("subsling") + .subscriberConfiguration(SubscriberConfig + .builder() + .editable(false) + .maxRetries(-1) .build()) .build(); } diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java b/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java deleted file mode 100644 index 3c76ea7..0000000 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/ProtobufRecordHandlerTest.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.sling.distribution.journal.kafka; - -import static org.apache.sling.distribution.journal.HandlerAdapter.create; - -import java.io.IOException; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.common.record.TimestampType; -import org.apache.sling.distribution.journal.ExceptionEventSender; -import org.apache.sling.distribution.journal.MessageInfo; -import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - -@RunWith(MockitoJUnitRunner.class) -public class ProtobufRecordHandlerTest { - @Mock - ExceptionEventSender eventSender; - - @Mock - KafkaConsumer<String, byte[]> consumer; - - @Test(expected = IllegalArgumentException.class) - public void testNoHeader() throws IOException, InterruptedException { - ConsumerRecord<String, byte[]> record = new ConsumerRecord<String, byte[]>("topic", 1, 0l, 0l, TimestampType.CREATE_TIME, 0, 0, 0, "key", null); - ProtobufRecordHandler handler = new ProtobufRecordHandler(create(DiscoveryMessage.class, this::handle)); - handler.accept(record); - } - - private void handle(MessageInfo info, DiscoveryMessage message) { - } -} diff --git a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java index 0d235b3..e1a92ed 100644 --- a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java +++ b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java @@ -76,6 +76,7 @@ public class KafkaLocal implements Closeable { kafkaProps.put("log.dir",logDir); kafkaProps.put("group.initial.rebalance.delay.ms", "0"); kafkaProps.put("group.min.session.timeout.ms", "1000"); + kafkaProps.put("offsets.topic.replication.factor", "1"); return kafkaProps; }
