Repository: incubator-rya
Updated Branches:
  refs/heads/master 31e06cb1b -> 9b8162ab7


RYA-377 Kafka implementation of the QueryChangeLog

Refactored serialization to be more abstract.
Changed QueryChange to be serializable.


Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/555a5957
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/555a5957
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/555a5957

Branch: refs/heads/master
Commit: 555a5957e85a627fd6ade3832862930fa50887d8
Parents: 3ccfbad
Author: Andrew Smith <smith...@gmail.com>
Authored: Wed Oct 25 19:01:53 2017 -0400
Committer: caleb <caleb.me...@parsons.com>
Committed: Tue Jan 9 15:12:59 2018 -0500

----------------------------------------------------------------------
 extras/rya.streams/api/pom.xml                  |   5 +
 .../rya/streams/api/queries/ChangeLogEntry.java |   6 +-
 .../rya/streams/api/queries/QueryChange.java    |  10 +-
 .../rya/streams/api/queries/QueryChangeLog.java |   2 +-
 .../kafka/queries/KafkaQueryChangeLog.java      | 158 ++++++++++++++
 .../kafka/serialization/ObjectDeserializer.java |  72 +++++++
 .../kafka/serialization/ObjectSerializer.java   |  73 +++++++
 .../VisibilityBindingSetDeserializer.java       |  36 +---
 .../VisibilityBindingSetSerializer.java         |  35 +---
 .../VisibilityStatementDeserializer.java        |  36 +---
 .../VisibilityStatementSerializer.java          |  35 +---
 .../queries/QueryChangeDeserializer.java        |  38 ++++
 .../serialization/queries/QueryChangeSerde.java |  57 ++++++
 .../queries/QueryChangeSerializer.java          |  39 ++++
 .../kafka/queries/KafkaQueryChangeLogIT.java    | 205 +++++++++++++++++++
 15 files changed, 669 insertions(+), 138 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/api/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.streams/api/pom.xml b/extras/rya.streams/api/pom.xml
index 13716de..2a1f51c 100644
--- a/extras/rya.streams/api/pom.xml
+++ b/extras/rya.streams/api/pom.xml
@@ -49,6 +49,11 @@ under the License.
             <artifactId>guava</artifactId>
         </dependency>
         
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+        
         <!-- Test dependences -->
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java
index 2a5e8a1..d57cee9 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java
@@ -35,7 +35,7 @@ import edu.umd.cs.findbugs.annotations.NonNull;
 @DefaultAnnotation(NonNull.class)
 public class ChangeLogEntry<T> {
 
-    private final int position;
+    private final long position;
     private final T entry;
 
     /**
@@ -44,7 +44,7 @@ public class ChangeLogEntry<T> {
      * @param position - The position of this entry within the change log.
      * @param entry - The value that is stored at this position within the 
change log. (not null)
      */
-    public ChangeLogEntry(final int position, final T entry) {
+    public ChangeLogEntry(final long position, final T entry) {
         this.position = position;
         this.entry = requireNonNull(entry);
     }
@@ -52,7 +52,7 @@ public class ChangeLogEntry<T> {
     /**
      * @return The position of this entry within the change log.
      */
-    public int getPosition() {
+    public long getPosition() {
         return position;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
index 55f87f7..90af79c 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java
@@ -20,10 +20,12 @@ package org.apache.rya.streams.api.queries;
 
 import static java.util.Objects.requireNonNull;
 
+import java.io.Serializable;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.UUID;
 
+import com.google.common.base.Optional;
+
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
 
@@ -33,8 +35,8 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * Immutable.
  */
 @DefaultAnnotation(NonNull.class)
-public final class QueryChange {
-
+public final class QueryChange implements Serializable {
+    private static final long serialVersionUID = 1L;
     private final UUID queryId;
     private final ChangeType changeType;
     private final Optional<String> sparql;
@@ -111,7 +113,7 @@ public final class QueryChange {
      * @return A {@link QueryChange} built using the provided values.
      */
     public static QueryChange delete(final UUID queryId) {
-        return new QueryChange(queryId, ChangeType.DELETE, Optional.empty());
+        return new QueryChange(queryId, ChangeType.DELETE, Optional.absent());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
index ba0e878..824eebc 100644
--- 
a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
+++ 
b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java
@@ -51,7 +51,7 @@ public interface QueryChangeLog {
      * @return The entries that are at and after the specified position.
      * @throws QueryChangeLogException The entries could not be fetched.
      */
-    public CloseableIteration<ChangeLogEntry<QueryChange>, 
QueryChangeLogException> readFromPosition(int position) throws 
QueryChangeLogException;
+    public CloseableIteration<ChangeLogEntry<QueryChange>, 
QueryChangeLogException> readFromPosition(long position) throws 
QueryChangeLogException;
 
     /**
      * One of the {@link QueryChangeLog} functions failed.

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
new file mode 100644
index 0000000..19622ae
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.queries;
+
+import static java.util.Objects.requireNonNull;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.api.queries.QueryChangeLog;
+
+import com.google.common.collect.Lists;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * A Kafka implementation of a {@link QueryChangeLog}.
+ */
+@DefaultAnnotation(NonNull.class)
+public class KafkaQueryChangeLog implements QueryChangeLog {
+    /*
+     * Key is '?' since you cannot have parallel processing over a sequential
+     * change log, so there is only one partition.
+     */
+    private final Producer<?, QueryChange> producer;
+
+    /*
+     * Key is '?' since you cannot have parallel processing over a sequential
+     * change log, so there is only one partition.
+     */
+    private final Consumer<?, QueryChange> consumer;
+
+    private final String topic;
+
+    /**
+     * Creates a new {@link KafkaQueryChangeLog}.
+     *
+     * @param producer - The producer to use to add {@link QueryChange}s to a 
kafka topic. (not null)
+     * @param consumer - The consumer to use to read {@link QueryChange}s from 
a kafka topic. (not null)
+     * @param topic - The topic on kafka to read/write from. (not null)
+     */
+    public KafkaQueryChangeLog(final Producer<?, QueryChange> producer,
+            final Consumer<?, QueryChange> consumer,
+            final String topic) {
+        this.producer = requireNonNull(producer);
+        this.consumer = requireNonNull(consumer);
+        this.topic = requireNonNull(topic);
+    }
+
+    @Override
+    public void write(final QueryChange newChange) throws 
QueryChangeLogException {
+        requireNonNull(newChange);
+        producer.send(new ProducerRecord<>(topic, newChange));
+    }
+
+    @Override
+    public CloseableIteration<ChangeLogEntry<QueryChange>, 
QueryChangeLogException> readFromStart() throws QueryChangeLogException {
+        final TopicPartition part = new TopicPartition(topic, 0);
+        consumer.assign(Lists.newArrayList(part));
+        consumer.seekToBeginning(Lists.newArrayList(part));
+        return new QueryChangeLogEntryIter(consumer);
+    }
+
+    @Override
+    public CloseableIteration<ChangeLogEntry<QueryChange>, 
QueryChangeLogException> readFromPosition(final long position) throws 
QueryChangeLogException {
+        final TopicPartition part = new TopicPartition(topic, 0);
+        consumer.assign(Lists.newArrayList(part));
+        consumer.seek(part, position);
+        return new QueryChangeLogEntryIter(consumer);
+    }
+
+    /**
+     * A {@link CloseableIteration} to iterate over a consumer's results. Since
+     * the consumer returns in bulk when poll(), a cache of recent polling is
+     * maintained.
+     *
+     * If there are no new results after 3 seconds,
+     * {@link QueryChangeLogEntryIter#hasNext()} will return false.
+     */
+    private class QueryChangeLogEntryIter implements 
CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> {
+        private final Consumer<?, QueryChange> consumer;
+        private Iterator<ChangeLogEntry<QueryChange>> iterCache;
+
+        /**
+         * Creates a new {@link QueryChangeLogEntryIter}.
+         *
+         * @param consumer - The consumer to iterate over. (not null)
+         */
+        public QueryChangeLogEntryIter(final Consumer<?, QueryChange> 
consumer) {
+            this.consumer = requireNonNull(consumer);
+        }
+
+        @Override
+        public boolean hasNext() throws QueryChangeLogException {
+            if (iterCache == null || !iterCache.hasNext()) {
+                populateCache();
+            }
+            return iterCache.hasNext();
+        }
+
+        @Override
+        public ChangeLogEntry<QueryChange> next() throws 
QueryChangeLogException {
+            if (iterCache == null && iterCache.hasNext()) {
+                populateCache();
+            }
+
+            if (iterCache.hasNext()) {
+                return iterCache.next();
+            }
+            throw new QueryChangeLogException("There are no changes in the 
change log.");
+        }
+
+        @Override
+        public void remove() throws QueryChangeLogException {
+        }
+
+        @Override
+        public void close() throws QueryChangeLogException {
+            consumer.unsubscribe();
+        }
+
+        private void populateCache() {
+            final ConsumerRecords<?, QueryChange> records = 
consumer.poll(3000L);
+            final List<ChangeLogEntry<QueryChange>> changes = new 
ArrayList<>();
+            records.forEach(
+                    record -> 
+                        changes.add(new 
ChangeLogEntry<QueryChange>(record.offset(), record.value()))
+                    );
+            iterCache = changes.iterator();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectDeserializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectDeserializer.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectDeserializer.java
new file mode 100644
index 0000000..8a1e50b
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectDeserializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.serialization;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize entities.
+ *
+ * @param T - The type of entity to deserialize.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class ObjectDeserializer<T> implements Deserializer<T> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ObjectDeserializer.class);
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // Nothing to do.
+    }
+
+    @Override
+    public T deserialize(final String topic, final byte[] data) {
+        if(data == null || data.length == 0) {
+            // Returning null because that is the contract of this method.
+            return null;
+        }
+
+        try {
+            return ObjectSerialization.deserialize(data, 
getDeserializedClass());
+        } catch (final ClassNotFoundException | ClassCastException | 
IOException e) {
+            log.error("Could not deserialize some data into a " + 
getDeserializedClass().getName() + ". This data will be skipped.", e);
+
+            // Returning null because that is the contract of this method.
+            return null;
+        }
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+
+    /**
+     * @return - Used by the {@link ObjectSerialization#deserialize()} and the 
logger.
+     */
+    protected abstract Class<T> getDeserializedClass();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectSerializer.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectSerializer.java
new file mode 100644
index 0000000..24f7652
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectSerializer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.serialization;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize entities using Java
+ * object serialization.
+ *
+ * @param T - The type of entity to serialize.
+ */
+@DefaultAnnotation(NonNull.class)
+public abstract class ObjectSerializer<T> implements Serializer<T> {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ObjectSerializer.class);
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // Nothing to do.
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final T data) {
+        if(data == null) {
+            return null;
+        }
+
+        try {
+            return ObjectSerialization.serialize(data);
+        } catch (final IOException e) {
+            log.error("Unable to serialize a " + 
getSerializedClass().getName() + ".", e);
+
+            // Return null when there is an error since that is the contract 
of this method.
+            return null;
+        }
+    }
+
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+
+    /**
+     * @return - The class name of T. This is used for logging purposes.
+     */
+    protected abstract Class<T> getSerializedClass();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java
index 1232ad9..011a311 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java
@@ -18,13 +18,8 @@
  */
 package org.apache.rya.streams.kafka.serialization;
 
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.rya.api.model.VisibilityBindingSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -33,34 +28,9 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * A Kafka {@link Deserializer} that is able to deserialize Java object 
serialized {@link VisibilityBindingSet}s.
  */
 @DefaultAnnotation(NonNull.class)
-public class VisibilityBindingSetDeserializer implements 
Deserializer<VisibilityBindingSet> {
-
-    private static final Logger log = 
LoggerFactory.getLogger(VisibilityBindingSetDeserializer.class);
-
-    @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-        // Nothing to do.
-    }
-
-    @Override
-    public VisibilityBindingSet deserialize(final String topic, final byte[] 
data) {
-        if(data == null || data.length == 0) {
-            // Returning null because that is the contract of this method.
-            return null;
-        }
-
-        try {
-            return ObjectSerialization.deserialize(data, 
VisibilityBindingSet.class);
-        } catch (final ClassNotFoundException | ClassCastException | 
IOException e) {
-            log.error("Could not deserialize some data into a " + 
VisibilityBindingSet.class.getName() + ". This data will be skipped.", e);
-
-            // Returning null because that is the contract of this method.
-            return null;
-        }
-    }
-
+public class VisibilityBindingSetDeserializer extends 
ObjectDeserializer<VisibilityBindingSet> {
     @Override
-    public void close() {
-        // Nothing to do.
+    protected Class<VisibilityBindingSet> getDeserializedClass() {
+        return VisibilityBindingSet.class;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java
index b2acdf2..c9cf36f 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java
@@ -18,13 +18,8 @@
  */
 package org.apache.rya.streams.kafka.serialization;
 
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.rya.api.model.VisibilityBindingSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -33,33 +28,9 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * A Kafka {@link Serializer} that is able to serialize {@link 
VisibilityBinidngSet}s using Java object serialization.
  */
 @DefaultAnnotation(NonNull.class)
-public class VisibilityBindingSetSerializer implements 
Serializer<VisibilityBindingSet> {
-
-    private static final Logger log = 
LoggerFactory.getLogger(VisibilityBindingSetDeserializer.class);
-
-    @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-        // Nothing to do.
-    }
-
-    @Override
-    public byte[] serialize(final String topic, final VisibilityBindingSet 
data) {
-        if(data == null) {
-            return null;
-        }
-
-        try {
-            return ObjectSerialization.serialize(data);
-        } catch (final IOException e) {
-            log.error("Unable to serialize a " + 
VisibilityBindingSet.class.getName() + ".", e);
-
-            // Return null when there is an error since that is the contract 
of this method.
-            return null;
-        }
-    }
-
+public class VisibilityBindingSetSerializer extends 
ObjectSerializer<VisibilityBindingSet> {
     @Override
-    public void close() {
-        // Nothing to do.
+    protected Class<VisibilityBindingSet> getSerializedClass() {
+        return VisibilityBindingSet.class;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java
index c0cd63c..4c03d96 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java
@@ -18,13 +18,8 @@
  */
 package org.apache.rya.streams.kafka.serialization;
 
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.rya.api.model.VisibilityStatement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -33,34 +28,9 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * A Kafka {@link Deserializer} that is able to deserialize Java object 
serialized {@link VisibilityStatement}s.
  */
 @DefaultAnnotation(NonNull.class)
-public class VisibilityStatementDeserializer implements 
Deserializer<VisibilityStatement> {
-
-    private static final Logger log = 
LoggerFactory.getLogger(VisibilityStatement.class);
-
-    @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-        // Nothing to do.
-    }
-
-    @Override
-    public VisibilityStatement deserialize(final String topic, final byte[] 
data) {
-        if(data == null || data.length == 0) {
-            // Returning null because that is the contract of this method.
-            return null;
-        }
-
-        try {
-            return ObjectSerialization.deserialize(data, 
VisibilityStatement.class);
-        } catch (final ClassNotFoundException | ClassCastException | 
IOException e) {
-            log.error("Could not deserialize some data into a " + 
VisibilityStatement.class.getName() + ". This data will be skipped.", e);
-
-            // Returning null because that is the contract of this method.
-            return null;
-        }
-    }
-
+public class VisibilityStatementDeserializer extends 
ObjectDeserializer<VisibilityStatement> {
     @Override
-    public void close() {
-        // Nothing to do.
+    protected Class<VisibilityStatement> getDeserializedClass() {
+        return VisibilityStatement.class;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java
index c0b526f..2395bf0 100644
--- 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java
@@ -18,13 +18,8 @@
  */
 package org.apache.rya.streams.kafka.serialization;
 
-import java.io.IOException;
-import java.util.Map;
-
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.rya.api.model.VisibilityStatement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
 import edu.umd.cs.findbugs.annotations.NonNull;
@@ -33,33 +28,9 @@ import edu.umd.cs.findbugs.annotations.NonNull;
  * A Kafka {@link Serializer} that is able to serialize {@link 
VisibilityStatement}s using Java object serialization.
  */
 @DefaultAnnotation(NonNull.class)
-public class VisibilityStatementSerializer implements 
Serializer<VisibilityStatement> {
-
-    private static final Logger log = 
LoggerFactory.getLogger(VisibilityStatementSerializer.class);
-
-    @Override
-    public void configure(final Map<String, ?> configs, final boolean isKey) {
-        // Nothing to do.
-    }
-
-    @Override
-    public byte[] serialize(final String topic, final VisibilityStatement 
data) {
-        if(data == null) {
-            return null;
-        }
-
-        try {
-            return ObjectSerialization.serialize(data);
-        } catch (final IOException e) {
-            log.error("Unable to serialize a " + 
VisibilityStatement.class.getName() + ".", e);
-
-            // Return null when there is an error since that is the contract 
of this method.
-            return null;
-        }
-    }
-
+public class VisibilityStatementSerializer extends 
ObjectSerializer<VisibilityStatement> {
     @Override
-    public void close() {
-        // Nothing to do.
+    protected Class<VisibilityStatement> getSerializedClass() {
+        return VisibilityStatement.class;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeDeserializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeDeserializer.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeDeserializer.java
new file mode 100644
index 0000000..96538b2
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeDeserializer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.serialization.queries;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.kafka.serialization.ObjectDeserializer;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Deserializer} that is able to deserialize Java object 
serialized {@link QueryChange}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryChangeDeserializer extends ObjectDeserializer<QueryChange> {
+
+    @Override
+    protected Class<QueryChange> getDeserializedClass() {
+        return QueryChange.class;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerde.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerde.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerde.java
new file mode 100644
index 0000000..c2e0469
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerde.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.serialization.queries;
+
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.streams.api.queries.QueryChange;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * Provides a {@link Serializer} and {@link Deserializer} for
+ * {@link QueryChange}s.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryChangeSerde implements Serde<QueryChange> {
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        // Nothing to do.
+    }
+
+    @Override
+    public Serializer<QueryChange> serializer() {
+        return new QueryChangeSerializer();
+    }
+
+    @Override
+    public Deserializer<QueryChange> deserializer() {
+        return new QueryChangeDeserializer();
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerializer.java
 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerializer.java
new file mode 100644
index 0000000..8a36680
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerializer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.serialization.queries;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.streams.api.queries.QueryChange;
+import org.apache.rya.streams.kafka.serialization.ObjectSerializer;
+
+import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
+import edu.umd.cs.findbugs.annotations.NonNull;
+
+/**
+ * A Kafka {@link Serializer} that is able to serialize {@link QueryChange}s
+ * using Java object serialization.
+ */
+@DefaultAnnotation(NonNull.class)
+public class QueryChangeSerializer extends ObjectSerializer<QueryChange> {
+
+    @Override
+    protected Class<QueryChange> getSerializedClass() {
+        return QueryChange.class;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
new file mode 100644
index 0000000..9e89ca7
--- /dev/null
+++ 
b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.streams.kafka.queries;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.rya.streams.api.queries.ChangeLogEntry;
+import org.apache.rya.streams.api.queries.QueryChange;
+import 
org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException;
+import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer;
+import 
org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer;
+import org.apache.rya.test.kafka.KafkaITBase;
+import org.apache.rya.test.kafka.KafkaTestInstanceRule;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import info.aduna.iteration.CloseableIteration;
+
+/**
+ * Integration tests the {@link KafkaQueryChangeLog}.
+ */
+public class KafkaQueryChangeLogIT extends KafkaITBase {
+    KafkaQueryChangeLog changeLog;
+
+    private Producer<?, QueryChange> producer;
+    private Consumer<?, QueryChange> consumer;
+
+    private String topic;
+
+    @Rule
+    public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true);
+
+    @Before
+    public void setup() {
+        topic = rule.getKafkaTopicName();
+        final Properties producerProperties = 
rule.createBootstrapServerConfig();
+        
producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
+        
producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
QueryChangeSerializer.class.getName());
+
+        final Properties consumerProperties = 
rule.createBootstrapServerConfig();
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
UUID.randomUUID().toString());
+        
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        
consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName());
+        
consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
QueryChangeDeserializer.class.getName());
+        producer = new KafkaProducer<>(producerProperties);
+        consumer = new KafkaConsumer<>(consumerProperties);
+        changeLog = new KafkaQueryChangeLog(producer, consumer, topic);
+    }
+
+    @After
+    public void cleanup() {
+        producer.flush();
+        producer.close();
+
+        consumer.close();
+    }
+
+    @Test
+    public void testWrite() throws Exception {
+        final String sparql = "SOME QUERY HERE";
+        final UUID uuid = UUID.randomUUID();
+        final QueryChange newChange = QueryChange.create(uuid, sparql);
+        changeLog.write(newChange);
+
+        consumer.subscribe(Lists.newArrayList(topic));
+        final ConsumerRecords<?, QueryChange> records = consumer.poll(2000);
+        assertEquals(1, records.count());
+
+        final QueryChange record = records.iterator().next().value();
+        assertEquals(newChange, record);
+    }
+
+    @Test
+    public void readFromBegining() throws Exception {
+        final List<QueryChange> expected = write10ChangesToChangeLog();
+
+        final CloseableIteration<ChangeLogEntry<QueryChange>, 
QueryChangeLogException> iter = changeLog.readFromStart();
+
+        final List<QueryChange> actual = new ArrayList<>();
+        while (iter.hasNext()) {
+            final ChangeLogEntry<QueryChange> entry = iter.next();
+            actual.add(entry.getEntry());
+        }
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void readFromBegining_positionStartsNotBegining() throws Exception {
+        final List<QueryChange> expected = write10ChangesToChangeLog();
+
+        // set the position to some non-0 position
+        final TopicPartition partition = new TopicPartition(topic, 0);
+        consumer.assign(Lists.newArrayList(partition));
+        consumer.seek(partition, 5L);
+        final CloseableIteration<ChangeLogEntry<QueryChange>, 
QueryChangeLogException> iter = changeLog.readFromStart();
+
+        final List<QueryChange> actual = new ArrayList<>();
+        while (iter.hasNext()) {
+            final ChangeLogEntry<QueryChange> entry = iter.next();
+            actual.add(entry.getEntry());
+        }
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void readFromPosition_positionStartsBegining() throws Exception {
+        final List<QueryChange> expected = 
write10ChangesToChangeLog().subList(5, 10);
+
+        // set the position to some non-0 position
+        final TopicPartition partition = new TopicPartition(topic, 0);
+        consumer.assign(Lists.newArrayList(partition));
+        consumer.seekToBeginning(Lists.newArrayList(partition));
+        final CloseableIteration<ChangeLogEntry<QueryChange>, 
QueryChangeLogException> iter = changeLog.readFromPosition(5L);
+
+        final List<QueryChange> actual = new ArrayList<>();
+        while (iter.hasNext()) {
+            final ChangeLogEntry<QueryChange> entry = iter.next();
+            actual.add(entry.getEntry());
+        }
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void readFromPosition_positionStartsNotBegining() throws Exception {
+        final List<QueryChange> expected = 
write10ChangesToChangeLog().subList(5, 10);
+
+        // set the position to some non-0 position
+        final TopicPartition partition = new TopicPartition(topic, 0);
+        consumer.assign(Lists.newArrayList(partition));
+        consumer.seekToEnd(Lists.newArrayList(partition));
+        final CloseableIteration<ChangeLogEntry<QueryChange>, 
QueryChangeLogException> iter = changeLog.readFromPosition(5L);
+
+        final List<QueryChange> actual = new ArrayList<>();
+        while (iter.hasNext()) {
+            final ChangeLogEntry<QueryChange> entry = iter.next();
+            actual.add(entry.getEntry());
+        }
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void readFromPosition_positionStartsEnd() throws Exception {
+        write10ChangesToChangeLog();
+
+        // set the position to some non-0 position
+        final TopicPartition partition = new TopicPartition(topic, 0);
+        consumer.assign(Lists.newArrayList(partition));
+        consumer.seekToEnd(Lists.newArrayList(partition));
+        final CloseableIteration<ChangeLogEntry<QueryChange>, 
QueryChangeLogException> iter = changeLog.readFromPosition(10L);
+        int count = 0;
+        while (iter.hasNext()) {
+            // should be empty
+            iter.next();
+            count++;
+        }
+        assertEquals(0, count);
+    }
+
+    private List<QueryChange> write10ChangesToChangeLog() throws Exception {
+        final List<QueryChange> changes = new ArrayList<>();
+        for (int ii = 0; ii < 10; ii++) {
+            final String sparql = "SOME QUERY HERE_" + ii;
+            final UUID uuid = UUID.randomUUID();
+            final QueryChange newChange = QueryChange.create(uuid, sparql);
+            changeLog.write(newChange);
+            changes.add(newChange);
+        }
+        return changes;
+    }
+}


Reply via email to