This is an automated email from the ASF dual-hosted git repository.

tmaret pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal-kafka.git

commit 6f3786179b49a75fd062615992c6e2a823788fc1
Author: tmaret <[email protected]>
AuthorDate: Tue Apr 16 23:54:57 2019 +0200

    SLING-8346 - Import Journal based Sling Content Distribution source code
    
    This project contains an implementation of the 
`org.apache.sling.distribution.journal` messaging API based on Apache Kafka
    
    Initial contributors
    
    *  Timothee Maret @tmaret
    *  Christian Schneider @cschneider
---
 LICENSE                                            | 202 ++++++++++++++++++
 README.md                                          |   5 +
 bnd.bnd                                            |   3 +
 pom.xml                                            | 142 +++++++++++++
 .../journal/kafka/KafkaClientProvider.java         | 231 +++++++++++++++++++++
 .../distribution/journal/kafka/KafkaEndpoint.java  |  39 ++++
 .../journal/kafka/KafkaJsonMessagePoller.java      | 106 ++++++++++
 .../journal/kafka/KafkaJsonMessageSender.java      |  63 ++++++
 .../journal/kafka/KafkaMessageInfo.java            |  58 ++++++
 .../journal/kafka/KafkaMessagePoller.java          | 133 ++++++++++++
 .../journal/kafka/KafkaMessageSender.java          |  78 +++++++
 .../distribution/journal/kafka/package-info.java   |  22 ++
 .../journal/kafka/JsonMessagingTest.java           |  85 ++++++++
 .../distribution/journal/kafka/MessagingTest.java  | 136 ++++++++++++
 .../journal/kafka/util/KafkaLocal.java             |  90 ++++++++
 .../distribution/journal/kafka/util/KafkaRule.java |  73 +++++++
 .../journal/kafka/util/ZooKeeperLocal.java         |  81 ++++++++
 src/test/resources/logback.xml                     |  33 +++
 18 files changed, 1580 insertions(+)

diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..d645695
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,202 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b225925
--- /dev/null
+++ b/README.md
@@ -0,0 +1,5 @@
+org.apache.sling.distribution.journal.kafka
+===========================================
+
+Project containing an implementation of the `distribution.journal` messaging 
API based on Apache Kafka.
+
diff --git a/bnd.bnd b/bnd.bnd
new file mode 100644
index 0000000..c45d94e
--- /dev/null
+++ b/bnd.bnd
@@ -0,0 +1,3 @@
+Bundle-Category: sling
+Bundle-Description: ${project.description}
+Bundle-License: Apache License, Version 2.0
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fc9cd1c
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <!-- 
======================================================================= -->
+    <!-- P A R E N T   P R O J E C T                                           
  -->
+    <!-- 
======================================================================= -->
+    <parent>
+        <groupId>org.apache.sling</groupId>
+        <artifactId>sling</artifactId>
+        <version>34</version>
+        <relativePath />
+    </parent>
+
+    <!-- 
======================================================================= -->
+    <!-- P R O J E C T                                                         
  -->
+    <!-- 
======================================================================= -->
+    <artifactId>org.apache.sling.distribution.journal.kafka</artifactId>
+    <version>0.0.1-INCUBATOR-SNAPSHOT</version>
+
+    <name>Apache Sling Journal Messaging based on Apache Kafka</name>
+    <description>Implementation of Apache Sling Content Distribution Journal 
Messaging based on Apache Kafka.</description>
+
+    <properties>
+        <sling.java.version>8</sling.java.version>
+    </properties>
+
+    <scm/>
+
+    <!-- 
======================================================================= -->
+    <!-- B U I L D                                                             
  -->
+    <!-- 
======================================================================= -->
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>biz.aQute.bnd</groupId>
+                <artifactId>bnd-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+    <!-- 
======================================================================= -->
+    <!-- D E P E N D E N C I E S                                               
  -->
+    <!-- 
======================================================================= -->
+    <dependencies>
+        <!-- Sling -->
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.distribution.journal</artifactId>
+            <version>0.0.1-INCUBATOR-SNAPSHOT</version>
+        </dependency>
+        <!-- OSGi -->
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.component.annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.metatype.annotations</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <!-- Apache Kafka -->
+        <dependency>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
+            <version>2.1.0_1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.9.7</version>
+        </dependency>
+        
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.12</artifactId>
+            <version>2.1.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.servicemix.bundles</groupId>
+            <artifactId>org.apache.servicemix.bundles.hamcrest</artifactId>
+            <version>1.3_1</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <!-- Newer version seems to have a problem in OSGi: -->
+            <!-- java.lang.ClassNotFoundException: 
junit.framework.ComparisonFailure not found by org.mockito.mockito-all [203] -->
+            <!-- So we use the older version that works -->
+            <version>1.9.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.converter</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.2.3</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+    <reporting>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>findbugs-maven-plugin</artifactId>
+                <version>2.5.3</version>
+            </plugin>
+        </plugins>
+    </reporting>
+</project>
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
new file mode 100644
index 0000000..dfc0936
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaClientProvider.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static java.lang.String.format;
+import static java.util.Collections.singleton;
+import static java.util.Collections.unmodifiableMap;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.ACKS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.JsonMessageSender;
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingException;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.ConfigurationPolicy;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.metatype.annotations.Designate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.GeneratedMessage;
+
+@Component(service = MessagingProvider.class, configurationPolicy = 
ConfigurationPolicy.REQUIRE)
+@Designate(ocd = KafkaEndpoint.class)
+public class KafkaClientProvider implements MessagingProvider, Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaClientProvider.class);
+
+    public static final int PARTITION = 0;
+
+    private volatile KafkaProducer<String, byte[]> rawProducer = null;
+
+    private volatile KafkaProducer<String, String> jsonProducer = null;
+
+    private String kafkaBootstrapServers;
+
+    private int requestTimeout;
+
+    private int defaultApiTimeout;
+
+    @Activate
+    public void activate(KafkaEndpoint kafkaEndpoint) {
+        kafkaBootstrapServers = 
requireNonNull(kafkaEndpoint.kafkaBootstrapServers());
+        requestTimeout = kafkaEndpoint.kafkaRequestTimeout();
+        defaultApiTimeout = kafkaEndpoint.kafkaDefaultApiTimeout();
+    }
+    
+    @Deactivate
+    public void close() {
+        IOUtils.closeQuietly(rawProducer);
+        IOUtils.closeQuietly(jsonProducer);
+    }
+
+    @Override
+    public <T extends GeneratedMessage> MessageSender<T> createSender() {
+        return new KafkaMessageSender<>(buildKafkaProducer());
+    }
+
+    @Override
+    public <T> Closeable createPoller(String topicName, Reset reset, 
HandlerAdapter<?>... adapters) {
+        return createPoller(topicName, reset, null, adapters);
+    }
+
+    @Override
+    public Closeable createPoller(String topicName, Reset reset, @Nullable 
String assign, HandlerAdapter<?>... adapters) {
+        String consumerGroupId = UUID.randomUUID().toString();
+        KafkaConsumer<String, byte[]> consumer = new 
KafkaConsumer<>(consumerConfig(ByteArrayDeserializer.class, consumerGroupId, 
reset));
+        TopicPartition topicPartition = new TopicPartition(topicName, 
PARTITION);
+        Collection<TopicPartition> topicPartitions = singleton(topicPartition);
+        consumer.assign(topicPartitions);
+        if (assign != null) {
+            consumer.seek(topicPartition, offset(assign));
+        } else if (reset == Reset.earliest) {
+            consumer.seekToBeginning(topicPartitions);
+        } else {
+            consumer.seekToEnd(topicPartitions);
+        }
+        Closeable poller = new KafkaMessagePoller(consumer, adapters);
+        LOG.info(format("Created poller for consumerGroupId %s, reset %s, 
topicName %s, assign %s", consumerGroupId, reset, topicName, assign));
+        return poller;
+    }
+
+    @Override
+    public <T> JsonMessageSender<T> createJsonSender() {
+        return new KafkaJsonMessageSender<>(buildJsonKafkaProducer());
+    }
+
+    @Override
+    public <T> Closeable createJsonPoller(String topicName, Reset reset, 
MessageHandler<T> handler, Class<T> type) {
+        String consumerGroupId = UUID.randomUUID().toString();
+        KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(consumerConfig(StringDeserializer.class, consumerGroupId, 
reset));
+        TopicPartition topicPartition = new TopicPartition(topicName, 
PARTITION);
+        Collection<TopicPartition> topicPartitions = singleton(topicPartition);
+        consumer.assign(topicPartitions);
+        if (reset == Reset.earliest) {
+            consumer.seekToBeginning(topicPartitions);
+        } else {
+            consumer.seekToEnd(topicPartitions);
+        }
+        return new KafkaJsonMessagePoller<>(consumer, handler, type);
+    }
+
+    @Override
+    public void assertTopic(String topic) throws MessagingException {
+        String consumerGroupId = UUID.randomUUID().toString();
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(consumerConfig(StringDeserializer.class, consumerGroupId, 
Reset.latest))) {
+            if (! consumer.listTopics().containsKey(topic)) {
+                throw new MessagingException(format("Topic %s does not exist", 
topic));
+            }
+        } catch (Exception e) {
+            throw new MessagingException(format("Unable to load topic stats 
for %s", topic), e);
+        }
+    }
+
+    @Override
+    public long retrieveOffset(String topicName, Reset reset) {
+        String groupId = UUID.randomUUID().toString();
+        KafkaConsumer<String, byte[]> consumer = new 
KafkaConsumer<>(consumerConfig(ByteArrayDeserializer.class, groupId, reset));
+        TopicPartition topicPartition = new TopicPartition(topicName, 
PARTITION);
+        Collection<TopicPartition> topicPartitions = singleton(topicPartition);
+        final Map<TopicPartition, Long> offsets;
+        if (reset == Reset.earliest) {
+            offsets = consumer.beginningOffsets(topicPartitions);
+        } else {
+            offsets = consumer.endOffsets(topicPartitions);
+        }
+        consumer.close();
+        return offsets.get(topicPartition);
+    }
+
+    @Override
+    public String assignTo(long offset) {
+        return format("%s:%s", PARTITION, offset);
+    }
+
+    @Nonnull
+    private synchronized KafkaProducer<String, byte[]> buildKafkaProducer() {
+        if (rawProducer == null) {
+            rawProducer = new 
KafkaProducer<>(producerConfig(ByteArraySerializer.class));
+        }
+        return rawProducer;
+    }
+
+    @Nonnull
+    private synchronized KafkaProducer<String, String> 
buildJsonKafkaProducer() {
+        if (jsonProducer == null) {
+            jsonProducer = new 
KafkaProducer<>(producerConfig(StringSerializer.class));
+        }
+        return jsonProducer;
+    }
+
+    private Map<String, Object> consumerConfig(Object deserializer, String 
consumerGroupId, Reset reset) {
+        Map<String, Object> config = new HashMap<>();
+        config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
+        config.put(GROUP_ID_CONFIG, consumerGroupId);
+        config.put(ENABLE_AUTO_COMMIT_CONFIG, false);
+        config.put(DEFAULT_API_TIMEOUT_MS_CONFIG, defaultApiTimeout);
+        config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        config.put(VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
+        config.put(AUTO_OFFSET_RESET_CONFIG, reset.name());
+        return config;
+    }
+
+    private Map<String, Object> producerConfig(Object serializer) {
+        Map<String, Object> config = new HashMap<>();
+        config.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        config.put(VALUE_SERIALIZER_CLASS_CONFIG, serializer);
+        config.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
+        config.put(REQUEST_TIMEOUT_MS_CONFIG, requestTimeout);
+        config.put(ACKS_CONFIG, "all");
+        return unmodifiableMap(config);
+    }
+
+    private long offset(String assign) {
+        String[] chunks = assign.split(":");
+        if (chunks.length != 2) {
+            throw new IllegalArgumentException(format("Illegal assign %s", 
assign));
+        }
+        return Long.parseLong(chunks[1]);
+    }
+
+
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
new file mode 100644
index 0000000..3e5428b
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaEndpoint.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import org.osgi.service.metatype.annotations.AttributeDefinition;
+import org.osgi.service.metatype.annotations.ObjectClassDefinition;
+
+@ObjectClassDefinition(name = "Apache Sling Journal Distribution - Kafka 
endpoint",
+        description = "Apache Kafka Endpoint")
+public @interface KafkaEndpoint {
+
+    @AttributeDefinition(name = "Kafka Bootstrap Servers",
+            description = "A comma separated list of host/port pairs to use 
for establishing the initial connection to the Kafka cluster.")
+    String kafkaBootstrapServers() default "localhost:9092";
+
+    @AttributeDefinition(name = "Kafka Request Timeout",
+            description = "Kafka Request Timeout in ms.")
+    int kafkaRequestTimeout() default 32000;
+
+    @AttributeDefinition(name = "Kafka Default API Timeout",
+            description = "Kafka Default API Timeout in ms.")
+    int kafkaDefaultApiTimeout() default 60000;
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
new file mode 100644
index 0000000..0eec98f
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessagePoller.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.sling.distribution.journal.MessageHandler;
+import org.apache.sling.distribution.journal.MessageInfo;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static java.lang.String.format;
+import static java.time.Duration.ofHours;
+
+public class KafkaJsonMessagePoller<T> implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaJsonMessagePoller.class);
+
+    private volatile boolean running = true;
+
+    private final KafkaConsumer<String, String> consumer;
+
+    private final MessageHandler<T> handler;
+
+    private final ObjectReader reader;
+
+    public KafkaJsonMessagePoller(KafkaConsumer<String, String> consumer, 
MessageHandler<T> handler, Class<T> clazz) {
+        this.consumer = consumer;
+        this.handler = handler;
+        ObjectMapper mapper = new ObjectMapper();
+        reader = mapper.readerFor(clazz);
+        startBackgroundThread(this::run, format("Message Json Poller for 
handler %s", handler));
+    }
+
+    @Override
+    public void close() throws IOException {
+        LOG.info("Shutdown JSON poller for handler {}", handler);
+        running = false;
+        consumer.wakeup();
+    }
+
+    public void run() {
+        LOG.info("Start JSON poller for handler {}", handler);
+        try {
+            while(running) {
+                consume();
+            }
+        } catch (WakeupException e) {
+            if (running) {
+                LOG.error("Waked up while running {}", e.getMessage(), e);
+                throw e;
+            } else {
+                LOG.debug("Waked up while stopping {}", e.getMessage(), e);
+            }
+        } catch(Throwable t) {
+            LOG.error(format("Catch Throwable %s closing consumer", 
t.getMessage()), t);
+            throw t;
+        } finally {
+            consumer.close();
+        }
+        LOG.info("Stop JSON poller for handler {}", handler);
+    }
+
+    private void consume() {
+        consumer.poll(ofHours(1))
+                .forEach(this::handleRecord);
+    }
+
+    private void handleRecord(ConsumerRecord<String, String> record) {
+        MessageInfo info = new KafkaMessageInfo(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.timestamp());
+        String payload = record.value();
+        try {
+            T message = reader.readValue(payload);
+            handler.handle(info, message);
+        } catch (IOException e) {
+            LOG.error("Failed to parse payload {}", payload);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
new file mode 100644
index 0000000..b07fb91
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaJsonMessageSender.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.sling.distribution.journal.JsonMessageSender;
+import org.apache.sling.distribution.journal.MessagingException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+
+public class KafkaJsonMessageSender<T> implements JsonMessageSender<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaJsonMessageSender.class);
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    private final KafkaProducer<String, String> producer;
+
+    public KafkaJsonMessageSender(KafkaProducer<String, String> producer) {
+        this.producer = requireNonNull(producer);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void send(String topic, T payload) {
+        try {
+            ObjectWriter writer = mapper.writerFor(payload.getClass());
+            String payloadSt = writer.writeValueAsString(payload);
+            ProducerRecord<String, String> record = new 
ProducerRecord<>(topic, PARTITION, null, payloadSt);
+            RecordMetadata metadata = producer.send(record).get();
+            LOG.info(format("Sent JSON to %s", metadata));
+        } catch (Exception e) {
+            throw new MessagingException(format("Failed to send JSON message 
on topic %s", topic), e);
+        }
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
new file mode 100644
index 0000000..bfa9abe
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageInfo.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.sling.distribution.journal.MessageInfo;
+
+public class KafkaMessageInfo implements MessageInfo {
+
+    private final String topic;
+    private final int partition;
+    private final long offset;
+    private final long createTime;
+
+    public KafkaMessageInfo(String topic, int partition, long offset, long 
createTime) {
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
+        this.createTime = createTime;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public int getPartition() {
+        return partition;
+    }
+
+    public long getOffset() {
+        return offset;
+    }
+
+    public long getCreateTime() {
+        return createTime;
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this);
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
new file mode 100644
index 0000000..6b22362
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessagePoller.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.distribution.journal.messages.Types;
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageInfo;
+import com.google.protobuf.ByteString;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+import static java.lang.Integer.parseInt;
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.time.Duration.ofHours;
+
+public class KafkaMessagePoller implements Closeable {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaMessagePoller.class);
+
+    private final Map<Class<?>, HandlerAdapter<?>> handlers = new HashMap<>();
+
+    private final KafkaConsumer<String, byte[]> consumer;
+
+    private volatile boolean running = true;
+
+    private final String types;
+
+    public KafkaMessagePoller(KafkaConsumer<String, byte[]> consumer, 
HandlerAdapter<?>... handlerAdapters) {
+        this.consumer = consumer;
+        for (HandlerAdapter<?> handlerAdapter : handlerAdapters) {
+            handlers.put(handlerAdapter.getType(), handlerAdapter);
+        }
+        types = handlers.keySet().toString();
+        startBackgroundThread(this::run, format("Message Poller %s", types));
+    }
+
+    @Override
+    public void close() throws IOException {
+        LOG.info("Shutdown poller for types {}", types);
+        running = false;
+        consumer.wakeup();
+    }
+
+    public void run() {
+        LOG.info("Start poller for types {}", types);
+        try {
+            while(running) {
+                consume();
+            }
+        } catch (WakeupException e) {
+            if (running) {
+                LOG.error("Waked up while running {}", e.getMessage(), e);
+                throw e;
+            } else {
+                LOG.debug("Waked up while stopping {}", e.getMessage(), e);
+            }
+        } catch(Throwable t) {
+            LOG.error(format("Catch Throwable %s closing consumer", 
t.getMessage()), t);
+            throw t;
+        } finally {
+            consumer.close();
+        }
+        LOG.info("Stop poller for types {}", types);
+    }
+
+    private void consume() {
+        consumer.poll(ofHours(1))
+                .forEach(this::handleRecord);
+    }
+
+    private void handleRecord(ConsumerRecord<String, byte[]> record) {
+        Class<?> type = Types.getType(
+                parseInt(getHeaderValue(record.headers(), "type")),
+                parseInt(getHeaderValue(record.headers(), "version")));
+        HandlerAdapter<?> adapter = handlers.get(type);
+        if (adapter != null) {
+            try {
+                handleRecord(adapter, record);
+            } catch (Exception e) {
+                String msg = format("Error consuming message for types %s", 
types);
+                LOG.warn(msg);
+            }
+        } else {
+            LOG.debug("No handler registered for type {}", type.getName());
+        }
+    }
+
+    private void handleRecord(HandlerAdapter<?> adapter, 
ConsumerRecord<String, byte[]> record) throws Exception {
+        MessageInfo info = new KafkaMessageInfo(
+                record.topic(),
+                record.partition(),
+                record.offset(),
+                record.timestamp());
+        ByteString payload = ByteString.copyFrom(record.value());
+        adapter.handle(info, payload);
+    }
+
+    private String getHeaderValue(Headers headers, String key) {
+        Header header = headers.lastHeader(key);
+        if (header == null) {
+            throw new IllegalArgumentException(format("Header with key %s not 
found", key));
+        }
+        return new String(header.value(), UTF_8);
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
new file mode 100644
index 0000000..a29bfe4
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/KafkaMessageSender.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.sling.distribution.journal.messages.Types;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingException;
+import com.google.protobuf.GeneratedMessage;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static java.lang.String.format;
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.sling.distribution.journal.kafka.KafkaClientProvider.PARTITION;
+
+public class KafkaMessageSender<T extends GeneratedMessage> implements 
MessageSender<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaMessageSender.class);
+
+    private final KafkaProducer<String, byte[]> producer;
+
+    public KafkaMessageSender(KafkaProducer<String, byte[]> producer) {
+        this.producer = requireNonNull(producer);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void send(String topic, T payload) {
+        Integer type = Types.getType(payload.getClass());
+        if (type == null) {
+            throw new IllegalArgumentException("No mapping for type " + 
payload.getClass().getName());
+        }
+        int version = Types.getVersion(payload.getClass());
+        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, 
PARTITION, null, null, payload.toByteArray(), toHeaders(type, version));
+        try {
+            RecordMetadata metadata = producer.send(record).get();
+            LOG.info(format("Sent to %s", metadata));
+        } catch (InterruptedException | ExecutionException e) {
+            throw new MessagingException(format("Failed to send message on 
topic %s", topic), e);
+        }
+    }
+
+    private Iterable<Header> toHeaders(int type, int version) {
+        return Arrays.asList(toHeader("type", type),
+                toHeader("version",version));
+    }
+
+    private Header toHeader(String key, int value) {
+        return new RecordHeader(key, Integer.toString(value).getBytes(UTF_8));
+    }
+}
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/kafka/package-info.java 
b/src/main/java/org/apache/sling/distribution/journal/kafka/package-info.java
new file mode 100644
index 0000000..a8928d4
--- /dev/null
+++ 
b/src/main/java/org/apache/sling/distribution/journal/kafka/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@ParametersAreNonnullByDefault
+package org.apache.sling.distribution.journal.kafka;
+
+import javax.annotation.ParametersAreNonnullByDefault;
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
new file mode 100644
index 0000000..181ab35
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/kafka/JsonMessagingTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static org.hamcrest.Matchers.samePropertyValuesAs;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.distribution.journal.JsonMessageSender;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.kafka.util.KafkaRule;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Non OSGi test for the interaction of JsonMessageSender, JsonMessagePoller
+ */
+public class JsonMessagingTest {
+
+    public static class Person {
+        public String name;
+    }
+
+    private static final String TOPIC_NAME = "test";
+    
+    private Semaphore sem = new Semaphore(0);
+    private Person lastMessage;
+    
+    @ClassRule
+    public static KafkaRule kafka = new KafkaRule();
+    
+    @Before
+    public void before() {
+        MockitoAnnotations.initMocks(this);
+    }
+    
+    @Test
+    public void testSendReceive() throws InterruptedException, IOException, 
IllegalArgumentException, IllegalAccessException, NoSuchFieldException, 
SecurityException {
+        MessagingProvider provider = kafka.getProvider();
+        Person msg = new Person();
+        msg.name = "Joe";
+        Closeable poller = provider.createJsonPoller(TOPIC_NAME, 
Reset.earliest, this::handle, Person.class);
+        JsonMessageSender<Person> messageSender = provider.createJsonSender();
+        
+        messageSender.send(TOPIC_NAME, msg);
+        assertReceived();
+        assertThat(this.lastMessage, samePropertyValuesAs(msg));
+        poller.close();
+    }
+    
+    private void assertReceived() throws InterruptedException {
+        assertTrue(sem.tryAcquire(30, TimeUnit.SECONDS));
+    }
+    
+    private void handle(MessageInfo info, Person message) {
+        this.lastMessage = message;
+        this.sem.release();
+    }
+    
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java 
b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
new file mode 100644
index 0000000..cd7b48a
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/kafka/MessagingTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.util.UUID;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageInfo;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.kafka.util.KafkaRule;
+import 
org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
+import 
org.apache.sling.distribution.journal.messages.Messages.SubscriberConfiguration;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.mockito.MockitoAnnotations;
+
+public class MessagingTest {
+
+    private String topicName;
+    private Semaphore sem = new Semaphore(0);
+    private volatile MessageInfo lastInfo;
+    
+    @ClassRule
+    public static KafkaRule kafka = new KafkaRule();
+    
+    @Before
+    public void before() {
+        MockitoAnnotations.initMocks(this);
+        topicName = "MessagingTest" + UUID.randomUUID().toString();
+    }
+    
+    @Test
+    public void testSendReceive() throws Exception {
+        MessagingProvider provider = kafka.getProvider();
+        HandlerAdapter<DiscoveryMessage> handler = 
HandlerAdapter.create(DiscoveryMessage.class, this::handle);
+        Closeable poller = provider.createPoller(topicName, Reset.earliest, 
handler);
+        DiscoveryMessage msg = DiscoveryMessage.newBuilder()
+                .setSubAgentName("sub1agent")
+                .setSubSlingId("subsling")
+                .setSubscriberConfiguration(SubscriberConfiguration
+                        .newBuilder()
+                        .setEditable(false)
+                        .setMaxRetries(-1)
+                        .build())
+                .build();
+        MessageSender<DiscoveryMessage> messageSender = 
provider.createSender();
+        
+        // After starting Kafka, sending and receiving should work
+        messageSender.send(topicName, msg);
+        assertReceived();
+
+        poller.close();
+    }
+    
+    @Test
+    public void testAssign() throws Exception {
+        MessagingProvider provider = kafka.getProvider();
+        DiscoveryMessage msg = DiscoveryMessage.newBuilder()
+                .setSubAgentName("sub1agent")
+                .setSubSlingId("subsling")
+                .setSubscriberConfiguration(SubscriberConfiguration
+                        .newBuilder()
+                        .setEditable(false)
+                        .setMaxRetries(-1)
+                        .build())
+                .build();
+        MessageSender<DiscoveryMessage> messageSender = 
provider.createSender();
+        messageSender.send(topicName, msg);
+        
+        HandlerAdapter<DiscoveryMessage> handler = 
HandlerAdapter.create(DiscoveryMessage.class, this::handle);
+        long offset;
+        try (Closeable poller = provider.createPoller(topicName, 
Reset.earliest, handler)) {
+            assertReceived();
+            offset = lastInfo.getOffset();
+        }
+        
+        // Starting from old offset .. should see our message
+        String assign = "0:" + offset;
+        try (Closeable poller = provider.createPoller(topicName, Reset.latest, 
assign, handler)) {
+            assertReceived();
+            assertThat(lastInfo.getOffset(), equalTo(offset));
+        }
+        
+        // Starting from invalid offset. Should see old message as we start 
from earliest
+        String invalid = "0:32532523453";
+        try (Closeable poller = provider.createPoller(topicName, 
Reset.earliest, invalid, handler)) {
+            assertReceived();
+        }
+        
+        // Starting from invalid offset. Should not see any message as we 
start from latest
+        try (Closeable poller = provider.createPoller(topicName, Reset.latest, 
invalid, handler)) {
+            assertNotReceived();
+        }
+    }
+
+    private void assertReceived() throws InterruptedException {
+        assertTrue(sem.tryAcquire(30, TimeUnit.SECONDS));
+    }
+    
+    private void assertNotReceived() throws InterruptedException {
+        assertFalse(sem.tryAcquire(2, TimeUnit.SECONDS));
+    }
+
+    private void handle(MessageInfo info, DiscoveryMessage message) {
+        this.lastInfo = info;
+        this.sem.release();
+    }
+    
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java
 
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java
new file mode 100644
index 0000000..0d235b3
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaLocal.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.VerifiableProperties;
+import scala.Some;
+import scala.collection.Seq;
+
+public class KafkaLocal implements Closeable {
+    Logger LOG = LoggerFactory.getLogger(KafkaLocal.class);
+    
+    public KafkaServer kafka;
+    public ZooKeeperLocal zookeeper;
+    
+    public KafkaLocal() throws Exception {
+        this(kafkaProperties(), zookeeperProperties());
+    }
+
+    public KafkaLocal(Properties kafkaProperties, Properties zkProperties) 
throws Exception {
+        zookeeper = new ZooKeeperLocal(zkProperties);
+        KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+        Seq<KafkaMetricsReporter> reporters = 
KafkaMetricsReporter.startReporters(new VerifiableProperties(kafkaProperties));
+        
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+        kafka = new KafkaServer(kafkaConfig, Time.SYSTEM, new 
Some<String>("kafka"), reporters);
+        kafka.startup();
+    }
+
+    @Override
+    public void close() throws IOException {
+        System.out.println("stopping kafka...");
+        try {
+            kafka.shutdown();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        zookeeper.close();
+        System.out.println("done");
+    }
+
+    private static Properties kafkaProperties() {
+        String logDir = "target/kafka-" + UUID.randomUUID().toString();
+        Properties kafkaProps = new Properties();
+        kafkaProps.put("zookeeper.connect", "localhost:2181");
+        kafkaProps.put("advertised.host.name", "localhost");
+        kafkaProps.put("host.name","localhost");
+        kafkaProps.put("port", "9092");
+        kafkaProps.put("broker.id", "0");
+        kafkaProps.put("log.dir",logDir);
+        kafkaProps.put("group.initial.rebalance.delay.ms", "0");
+        kafkaProps.put("group.min.session.timeout.ms", "1000");
+        return kafkaProps;
+    }
+
+    private static Properties zookeeperProperties() {
+        Properties zkProps = new Properties();
+        UUID uuid = UUID.randomUUID();
+        zkProps.put("dataDir", "target/zookeeper/"+uuid.toString());
+        zkProps.put("clientPort", "2181");
+        return zkProps;
+    }
+
+}
\ No newline at end of file
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java 
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
new file mode 100644
index 0000000..8b8b74c
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/KafkaRule.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka.util;
+
+import static org.osgi.util.converter.Converters.standardConverter;
+
+import java.io.IOException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.kafka.KafkaClientProvider;
+import org.apache.sling.distribution.journal.kafka.KafkaEndpoint;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+import com.google.common.collect.ImmutableMap;
+
+public class KafkaRule implements TestRule {
+
+    private KafkaClientProvider provider;
+
+    @Override
+    public Statement apply(Statement base, Description description) {
+        return new Statement() {
+
+            @Override
+            public void evaluate() throws Throwable {
+                runWithKafka(base);
+            }
+        };
+    }
+
+    private void runWithKafka(Statement base) throws Throwable, IOException, 
Exception {
+        try (KafkaLocal kafka = new KafkaLocal()) {
+            this.provider = createProvider();
+            base.evaluate();
+            IOUtils.closeQuietly(this.provider);
+        }
+    }
+
+    private KafkaClientProvider createProvider() {
+        KafkaClientProvider provider = new KafkaClientProvider();
+        ImmutableMap<String, String> props = ImmutableMap.of(
+                "connectTimeout", "5000");
+        KafkaEndpoint config = 
standardConverter().convert(props).to(KafkaEndpoint.class);
+        provider.activate(config);
+        return provider;
+    }
+    
+    public MessagingProvider getProvider() {
+        if (this.provider == null) {
+            this.provider = createProvider();
+        }
+        return provider;
+    }
+}
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/kafka/util/ZooKeeperLocal.java
 
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/ZooKeeperLocal.java
new file mode 100644
index 0000000..3a839aa
--- /dev/null
+++ 
b/src/test/java/org/apache/sling/distribution/journal/kafka/util/ZooKeeperLocal.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.kafka.util;
+
+import java.io.Closeable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperLocal implements Closeable {
+    static Logger LOG = LoggerFactory.getLogger(ZooKeeperLocal.class);
+    MyZooKeeperServerMain zooKeeperServer;
+
+    public ZooKeeperLocal(Properties zkProperties) throws 
FileNotFoundException, IOException, ConfigException {
+        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
+        quorumConfiguration.parseProperties(zkProperties);
+        zooKeeperServer = new MyZooKeeperServerMain(quorumConfiguration);
+
+        new Thread() {
+            public void run() {
+                try {
+                    zooKeeperServer.startup();
+                } catch (IOException e) {
+                    System.out.println("ZooKeeper Failed");
+                    e.printStackTrace(System.err);
+                }
+            }
+        }.start();
+    }
+
+    @Override
+    public void close() throws IOException {
+        zooKeeperServer.shutdown();
+    }
+    
+    static class MyZooKeeperServerMain extends ZooKeeperServerMain {
+
+        private QuorumPeerConfig config;
+
+        MyZooKeeperServerMain(QuorumPeerConfig config) {
+            this.config = config;
+        }
+
+        public void startup() throws IOException {
+            ServerConfig serverConfig = new ServerConfig();
+            serverConfig.readFrom(config);
+            runFromConfig(serverConfig);
+        }
+
+        public void shutdown() {
+            try {
+                super.shutdown();
+            } catch (Exception e) {
+                LOG.error("Error shutting down ZooKeeper", e);
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
new file mode 100644
index 0000000..9114d97
--- /dev/null
+++ b/src/test/resources/logback.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<configuration>
+  <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder>
+      <pattern>%date %level [%thread] %logger %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="warn">
+    <appender-ref ref="console"/>
+  </root>
+
+  <logger name="org.apache.sling" level="INFO"/>
+  
+</configuration>

Reply via email to