Repository: incubator-rya Updated Branches: refs/heads/master 31e06cb1b -> 9b8162ab7
RYA-377 Kafka implementation of the QueryChangeLog Refactored serialization to be more abstract. Changed QueryChange to be serializable. Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/555a5957 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/555a5957 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/555a5957 Branch: refs/heads/master Commit: 555a5957e85a627fd6ade3832862930fa50887d8 Parents: 3ccfbad Author: Andrew Smith <smith...@gmail.com> Authored: Wed Oct 25 19:01:53 2017 -0400 Committer: caleb <caleb.me...@parsons.com> Committed: Tue Jan 9 15:12:59 2018 -0500 ---------------------------------------------------------------------- extras/rya.streams/api/pom.xml | 5 + .../rya/streams/api/queries/ChangeLogEntry.java | 6 +- .../rya/streams/api/queries/QueryChange.java | 10 +- .../rya/streams/api/queries/QueryChangeLog.java | 2 +- .../kafka/queries/KafkaQueryChangeLog.java | 158 ++++++++++++++ .../kafka/serialization/ObjectDeserializer.java | 72 +++++++ .../kafka/serialization/ObjectSerializer.java | 73 +++++++ .../VisibilityBindingSetDeserializer.java | 36 +--- .../VisibilityBindingSetSerializer.java | 35 +--- .../VisibilityStatementDeserializer.java | 36 +--- .../VisibilityStatementSerializer.java | 35 +--- .../queries/QueryChangeDeserializer.java | 38 ++++ .../serialization/queries/QueryChangeSerde.java | 57 ++++++ .../queries/QueryChangeSerializer.java | 39 ++++ .../kafka/queries/KafkaQueryChangeLogIT.java | 205 +++++++++++++++++++ 15 files changed, 669 insertions(+), 138 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/pom.xml b/extras/rya.streams/api/pom.xml index 13716de..2a1f51c 100644 --- a/extras/rya.streams/api/pom.xml +++ b/extras/rya.streams/api/pom.xml @@ -49,6 +49,11 @@ under the License. <artifactId>guava</artifactId> </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <!-- Test dependences --> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java index 2a5e8a1..d57cee9 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/ChangeLogEntry.java @@ -35,7 +35,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; @DefaultAnnotation(NonNull.class) public class ChangeLogEntry<T> { - private final int position; + private final long position; private final T entry; /** @@ -44,7 +44,7 @@ public class ChangeLogEntry<T> { * @param position - The position of this entry within the change log. * @param entry - The value that is stored at this position within the change log. (not null) */ - public ChangeLogEntry(final int position, final T entry) { + public ChangeLogEntry(final long position, final T entry) { this.position = position; this.entry = requireNonNull(entry); } @@ -52,7 +52,7 @@ public class ChangeLogEntry<T> { /** * @return The position of this entry within the change log. */ - public int getPosition() { + public long getPosition() { return position; } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java index 55f87f7..90af79c 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChange.java @@ -20,10 +20,12 @@ package org.apache.rya.streams.api.queries; import static java.util.Objects.requireNonNull; +import java.io.Serializable; import java.util.Objects; -import java.util.Optional; import java.util.UUID; +import com.google.common.base.Optional; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -33,8 +35,8 @@ import edu.umd.cs.findbugs.annotations.NonNull; * Immutable. */ @DefaultAnnotation(NonNull.class) -public final class QueryChange { - +public final class QueryChange implements Serializable { + private static final long serialVersionUID = 1L; private final UUID queryId; private final ChangeType changeType; private final Optional<String> sparql; @@ -111,7 +113,7 @@ public final class QueryChange { * @return A {@link QueryChange} built using the provided values. */ public static QueryChange delete(final UUID queryId) { - return new QueryChange(queryId, ChangeType.DELETE, Optional.empty()); + return new QueryChange(queryId, ChangeType.DELETE, Optional.absent()); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java index ba0e878..824eebc 100644 --- a/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java +++ b/extras/rya.streams/api/src/main/java/org/apache/rya/streams/api/queries/QueryChangeLog.java @@ -51,7 +51,7 @@ public interface QueryChangeLog { * @return The entries that are at and after the specified position. * @throws QueryChangeLogException The entries could not be fetched. */ - public CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> readFromPosition(int position) throws QueryChangeLogException; + public CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> readFromPosition(long position) throws QueryChangeLogException; /** * One of the {@link QueryChangeLog} functions failed. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java new file mode 100644 index 0000000..19622ae --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLog.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.queries; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog; + +import com.google.common.collect.Lists; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import info.aduna.iteration.CloseableIteration; + +/** + * A Kafka implementation of a {@link QueryChangeLog}. + */ +@DefaultAnnotation(NonNull.class) +public class KafkaQueryChangeLog implements QueryChangeLog { + /* + * Key is '?' since you cannot have parallel processing over a sequential + * change log, so there is only one partition. + */ + private final Producer<?, QueryChange> producer; + + /* + * Key is '?' since you cannot have parallel processing over a sequential + * change log, so there is only one partition. + */ + private final Consumer<?, QueryChange> consumer; + + private final String topic; + + /** + * Creates a new {@link KafkaQueryChangeLog}. + * + * @param producer - The producer to use to add {@link QueryChange}s to a kafka topic. (not null) + * @param consumer - The consumer to use to read {@link QueryChange}s from a kafka topic. (not null) + * @param topic - The topic on kafka to read/write from. (not null) + */ + public KafkaQueryChangeLog(final Producer<?, QueryChange> producer, + final Consumer<?, QueryChange> consumer, + final String topic) { + this.producer = requireNonNull(producer); + this.consumer = requireNonNull(consumer); + this.topic = requireNonNull(topic); + } + + @Override + public void write(final QueryChange newChange) throws QueryChangeLogException { + requireNonNull(newChange); + producer.send(new ProducerRecord<>(topic, newChange)); + } + + @Override + public CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> readFromStart() throws QueryChangeLogException { + final TopicPartition part = new TopicPartition(topic, 0); + consumer.assign(Lists.newArrayList(part)); + consumer.seekToBeginning(Lists.newArrayList(part)); + return new QueryChangeLogEntryIter(consumer); + } + + @Override + public CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> readFromPosition(final long position) throws QueryChangeLogException { + final TopicPartition part = new TopicPartition(topic, 0); + consumer.assign(Lists.newArrayList(part)); + consumer.seek(part, position); + return new QueryChangeLogEntryIter(consumer); + } + + /** + * A {@link CloseableIteration} to iterate over a consumer's results. Since + * the consumer returns in bulk when poll(), a cache of recent polling is + * maintained. + * + * If there are no new results after 3 seconds, + * {@link QueryChangeLogEntryIter#hasNext()} will return false. + */ + private class QueryChangeLogEntryIter implements CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> { + private final Consumer<?, QueryChange> consumer; + private Iterator<ChangeLogEntry<QueryChange>> iterCache; + + /** + * Creates a new {@link QueryChangeLogEntryIter}. + * + * @param consumer - The consumer to iterate over. (not null) + */ + public QueryChangeLogEntryIter(final Consumer<?, QueryChange> consumer) { + this.consumer = requireNonNull(consumer); + } + + @Override + public boolean hasNext() throws QueryChangeLogException { + if (iterCache == null || !iterCache.hasNext()) { + populateCache(); + } + return iterCache.hasNext(); + } + + @Override + public ChangeLogEntry<QueryChange> next() throws QueryChangeLogException { + if (iterCache == null && iterCache.hasNext()) { + populateCache(); + } + + if (iterCache.hasNext()) { + return iterCache.next(); + } + throw new QueryChangeLogException("There are no changes in the change log."); + } + + @Override + public void remove() throws QueryChangeLogException { + } + + @Override + public void close() throws QueryChangeLogException { + consumer.unsubscribe(); + } + + private void populateCache() { + final ConsumerRecords<?, QueryChange> records = consumer.poll(3000L); + final List<ChangeLogEntry<QueryChange>> changes = new ArrayList<>(); + records.forEach( + record -> + changes.add(new ChangeLogEntry<QueryChange>(record.offset(), record.value())) + ); + iterCache = changes.iterator(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectDeserializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectDeserializer.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectDeserializer.java new file mode 100644 index 0000000..8a1e50b --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectDeserializer.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.serialization; + +import java.io.IOException; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Deserializer} that is able to deserialize entities. + * + * @param T - The type of entity to deserialize. + */ +@DefaultAnnotation(NonNull.class) +public abstract class ObjectDeserializer<T> implements Deserializer<T> { + + private static final Logger log = LoggerFactory.getLogger(ObjectDeserializer.class); + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Nothing to do. + } + + @Override + public T deserialize(final String topic, final byte[] data) { + if(data == null || data.length == 0) { + // Returning null because that is the contract of this method. + return null; + } + + try { + return ObjectSerialization.deserialize(data, getDeserializedClass()); + } catch (final ClassNotFoundException | ClassCastException | IOException e) { + log.error("Could not deserialize some data into a " + getDeserializedClass().getName() + ". This data will be skipped.", e); + + // Returning null because that is the contract of this method. + return null; + } + } + + @Override + public void close() { + // Nothing to do. + } + + /** + * @return - Used by the {@link ObjectSerialization#deserialize()} and the logger. + */ + protected abstract Class<T> getDeserializedClass(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectSerializer.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectSerializer.java new file mode 100644 index 0000000..24f7652 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/ObjectSerializer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.serialization; + +import java.io.IOException; +import java.util.Map; + +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize entities using Java + * object serialization. + * + * @param T - The type of entity to serialize. + */ +@DefaultAnnotation(NonNull.class) +public abstract class ObjectSerializer<T> implements Serializer<T> { + + private static final Logger log = LoggerFactory.getLogger(ObjectSerializer.class); + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Nothing to do. + } + + @Override + public byte[] serialize(final String topic, final T data) { + if(data == null) { + return null; + } + + try { + return ObjectSerialization.serialize(data); + } catch (final IOException e) { + log.error("Unable to serialize a " + getSerializedClass().getName() + ".", e); + + // Return null when there is an error since that is the contract of this method. + return null; + } + } + + + @Override + public void close() { + // Nothing to do. + } + + /** + * @return - The class name of T. This is used for logging purposes. + */ + protected abstract Class<T> getSerializedClass(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java index 1232ad9..011a311 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetDeserializer.java @@ -18,13 +18,8 @@ */ package org.apache.rya.streams.kafka.serialization; -import java.io.IOException; -import java.util.Map; - import org.apache.kafka.common.serialization.Deserializer; import org.apache.rya.api.model.VisibilityBindingSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -33,34 +28,9 @@ import edu.umd.cs.findbugs.annotations.NonNull; * A Kafka {@link Deserializer} that is able to deserialize Java object serialized {@link VisibilityBindingSet}s. */ @DefaultAnnotation(NonNull.class) -public class VisibilityBindingSetDeserializer implements Deserializer<VisibilityBindingSet> { - - private static final Logger log = LoggerFactory.getLogger(VisibilityBindingSetDeserializer.class); - - @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - // Nothing to do. - } - - @Override - public VisibilityBindingSet deserialize(final String topic, final byte[] data) { - if(data == null || data.length == 0) { - // Returning null because that is the contract of this method. - return null; - } - - try { - return ObjectSerialization.deserialize(data, VisibilityBindingSet.class); - } catch (final ClassNotFoundException | ClassCastException | IOException e) { - log.error("Could not deserialize some data into a " + VisibilityBindingSet.class.getName() + ". This data will be skipped.", e); - - // Returning null because that is the contract of this method. - return null; - } - } - +public class VisibilityBindingSetDeserializer extends ObjectDeserializer<VisibilityBindingSet> { @Override - public void close() { - // Nothing to do. + protected Class<VisibilityBindingSet> getDeserializedClass() { + return VisibilityBindingSet.class; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java index b2acdf2..c9cf36f 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityBindingSetSerializer.java @@ -18,13 +18,8 @@ */ package org.apache.rya.streams.kafka.serialization; -import java.io.IOException; -import java.util.Map; - import org.apache.kafka.common.serialization.Serializer; import org.apache.rya.api.model.VisibilityBindingSet; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -33,33 +28,9 @@ import edu.umd.cs.findbugs.annotations.NonNull; * A Kafka {@link Serializer} that is able to serialize {@link VisibilityBinidngSet}s using Java object serialization. */ @DefaultAnnotation(NonNull.class) -public class VisibilityBindingSetSerializer implements Serializer<VisibilityBindingSet> { - - private static final Logger log = LoggerFactory.getLogger(VisibilityBindingSetDeserializer.class); - - @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - // Nothing to do. - } - - @Override - public byte[] serialize(final String topic, final VisibilityBindingSet data) { - if(data == null) { - return null; - } - - try { - return ObjectSerialization.serialize(data); - } catch (final IOException e) { - log.error("Unable to serialize a " + VisibilityBindingSet.class.getName() + ".", e); - - // Return null when there is an error since that is the contract of this method. - return null; - } - } - +public class VisibilityBindingSetSerializer extends ObjectSerializer<VisibilityBindingSet> { @Override - public void close() { - // Nothing to do. + protected Class<VisibilityBindingSet> getSerializedClass() { + return VisibilityBindingSet.class; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java index c0cd63c..4c03d96 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementDeserializer.java @@ -18,13 +18,8 @@ */ package org.apache.rya.streams.kafka.serialization; -import java.io.IOException; -import java.util.Map; - import org.apache.kafka.common.serialization.Deserializer; import org.apache.rya.api.model.VisibilityStatement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -33,34 +28,9 @@ import edu.umd.cs.findbugs.annotations.NonNull; * A Kafka {@link Deserializer} that is able to deserialize Java object serialized {@link VisibilityStatement}s. */ @DefaultAnnotation(NonNull.class) -public class VisibilityStatementDeserializer implements Deserializer<VisibilityStatement> { - - private static final Logger log = LoggerFactory.getLogger(VisibilityStatement.class); - - @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - // Nothing to do. - } - - @Override - public VisibilityStatement deserialize(final String topic, final byte[] data) { - if(data == null || data.length == 0) { - // Returning null because that is the contract of this method. - return null; - } - - try { - return ObjectSerialization.deserialize(data, VisibilityStatement.class); - } catch (final ClassNotFoundException | ClassCastException | IOException e) { - log.error("Could not deserialize some data into a " + VisibilityStatement.class.getName() + ". This data will be skipped.", e); - - // Returning null because that is the contract of this method. - return null; - } - } - +public class VisibilityStatementDeserializer extends ObjectDeserializer<VisibilityStatement> { @Override - public void close() { - // Nothing to do. + protected Class<VisibilityStatement> getDeserializedClass() { + return VisibilityStatement.class; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java index c0b526f..2395bf0 100644 --- a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/VisibilityStatementSerializer.java @@ -18,13 +18,8 @@ */ package org.apache.rya.streams.kafka.serialization; -import java.io.IOException; -import java.util.Map; - import org.apache.kafka.common.serialization.Serializer; import org.apache.rya.api.model.VisibilityStatement; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -33,33 +28,9 @@ import edu.umd.cs.findbugs.annotations.NonNull; * A Kafka {@link Serializer} that is able to serialize {@link VisibilityStatement}s using Java object serialization. */ @DefaultAnnotation(NonNull.class) -public class VisibilityStatementSerializer implements Serializer<VisibilityStatement> { - - private static final Logger log = LoggerFactory.getLogger(VisibilityStatementSerializer.class); - - @Override - public void configure(final Map<String, ?> configs, final boolean isKey) { - // Nothing to do. - } - - @Override - public byte[] serialize(final String topic, final VisibilityStatement data) { - if(data == null) { - return null; - } - - try { - return ObjectSerialization.serialize(data); - } catch (final IOException e) { - log.error("Unable to serialize a " + VisibilityStatement.class.getName() + ".", e); - - // Return null when there is an error since that is the contract of this method. - return null; - } - } - +public class VisibilityStatementSerializer extends ObjectSerializer<VisibilityStatement> { @Override - public void close() { - // Nothing to do. + protected Class<VisibilityStatement> getSerializedClass() { + return VisibilityStatement.class; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeDeserializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeDeserializer.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeDeserializer.java new file mode 100644 index 0000000..96538b2 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeDeserializer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.serialization.queries; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.kafka.serialization.ObjectDeserializer; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Deserializer} that is able to deserialize Java object serialized {@link QueryChange}s. + */ +@DefaultAnnotation(NonNull.class) +public class QueryChangeDeserializer extends ObjectDeserializer<QueryChange> { + + @Override + protected Class<QueryChange> getDeserializedClass() { + return QueryChange.class; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerde.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerde.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerde.java new file mode 100644 index 0000000..c2e0469 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerde.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.serialization.queries; + +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.rya.streams.api.queries.QueryChange; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Provides a {@link Serializer} and {@link Deserializer} for + * {@link QueryChange}s. + */ +@DefaultAnnotation(NonNull.class) +public class QueryChangeSerde implements Serde<QueryChange> { + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + // Nothing to do. + } + + @Override + public Serializer<QueryChange> serializer() { + return new QueryChangeSerializer(); + } + + @Override + public Deserializer<QueryChange> deserializer() { + return new QueryChangeDeserializer(); + } + + @Override + public void close() { + // Nothing to do. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerializer.java b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerializer.java new file mode 100644 index 0000000..8a36680 --- /dev/null +++ b/extras/rya.streams/kafka/src/main/java/org/apache/rya/streams/kafka/serialization/queries/QueryChangeSerializer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.serialization.queries; + +import org.apache.kafka.common.serialization.Serializer; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.kafka.serialization.ObjectSerializer; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A Kafka {@link Serializer} that is able to serialize {@link QueryChange}s + * using Java object serialization. + */ +@DefaultAnnotation(NonNull.class) +public class QueryChangeSerializer extends ObjectSerializer<QueryChange> { + + @Override + protected Class<QueryChange> getSerializedClass() { + return QueryChange.class; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/555a5957/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java new file mode 100644 index 0000000..9e89ca7 --- /dev/null +++ b/extras/rya.streams/kafka/src/test/java/org/apache/rya/streams/kafka/queries/KafkaQueryChangeLogIT.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.streams.kafka.queries; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.streams.api.queries.ChangeLogEntry; +import org.apache.rya.streams.api.queries.QueryChange; +import org.apache.rya.streams.api.queries.QueryChangeLog.QueryChangeLogException; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeDeserializer; +import org.apache.rya.streams.kafka.serialization.queries.QueryChangeSerializer; +import org.apache.rya.test.kafka.KafkaITBase; +import org.apache.rya.test.kafka.KafkaTestInstanceRule; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import info.aduna.iteration.CloseableIteration; + +/** + * Integration tests the {@link KafkaQueryChangeLog}. + */ +public class KafkaQueryChangeLogIT extends KafkaITBase { + KafkaQueryChangeLog changeLog; + + private Producer<?, QueryChange> producer; + private Consumer<?, QueryChange> consumer; + + private String topic; + + @Rule + public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(true); + + @Before + public void setup() { + topic = rule.getKafkaTopicName(); + final Properties producerProperties = rule.createBootstrapServerConfig(); + producerProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + producerProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, QueryChangeSerializer.class.getName()); + + final Properties consumerProperties = rule.createBootstrapServerConfig(); + consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); + consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, QueryChangeDeserializer.class.getName()); + producer = new KafkaProducer<>(producerProperties); + consumer = new KafkaConsumer<>(consumerProperties); + changeLog = new KafkaQueryChangeLog(producer, consumer, topic); + } + + @After + public void cleanup() { + producer.flush(); + producer.close(); + + consumer.close(); + } + + @Test + public void testWrite() throws Exception { + final String sparql = "SOME QUERY HERE"; + final UUID uuid = UUID.randomUUID(); + final QueryChange newChange = QueryChange.create(uuid, sparql); + changeLog.write(newChange); + + consumer.subscribe(Lists.newArrayList(topic)); + final ConsumerRecords<?, QueryChange> records = consumer.poll(2000); + assertEquals(1, records.count()); + + final QueryChange record = records.iterator().next().value(); + assertEquals(newChange, record); + } + + @Test + public void readFromBegining() throws Exception { + final List<QueryChange> expected = write10ChangesToChangeLog(); + + final CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> iter = changeLog.readFromStart(); + + final List<QueryChange> actual = new ArrayList<>(); + while (iter.hasNext()) { + final ChangeLogEntry<QueryChange> entry = iter.next(); + actual.add(entry.getEntry()); + } + assertEquals(expected, actual); + } + + @Test + public void readFromBegining_positionStartsNotBegining() throws Exception { + final List<QueryChange> expected = write10ChangesToChangeLog(); + + // set the position to some non-0 position + final TopicPartition partition = new TopicPartition(topic, 0); + consumer.assign(Lists.newArrayList(partition)); + consumer.seek(partition, 5L); + final CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> iter = changeLog.readFromStart(); + + final List<QueryChange> actual = new ArrayList<>(); + while (iter.hasNext()) { + final ChangeLogEntry<QueryChange> entry = iter.next(); + actual.add(entry.getEntry()); + } + assertEquals(expected, actual); + } + + @Test + public void readFromPosition_positionStartsBegining() throws Exception { + final List<QueryChange> expected = write10ChangesToChangeLog().subList(5, 10); + + // set the position to some non-0 position + final TopicPartition partition = new TopicPartition(topic, 0); + consumer.assign(Lists.newArrayList(partition)); + consumer.seekToBeginning(Lists.newArrayList(partition)); + final CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> iter = changeLog.readFromPosition(5L); + + final List<QueryChange> actual = new ArrayList<>(); + while (iter.hasNext()) { + final ChangeLogEntry<QueryChange> entry = iter.next(); + actual.add(entry.getEntry()); + } + assertEquals(expected, actual); + } + + @Test + public void readFromPosition_positionStartsNotBegining() throws Exception { + final List<QueryChange> expected = write10ChangesToChangeLog().subList(5, 10); + + // set the position to some non-0 position + final TopicPartition partition = new TopicPartition(topic, 0); + consumer.assign(Lists.newArrayList(partition)); + consumer.seekToEnd(Lists.newArrayList(partition)); + final CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> iter = changeLog.readFromPosition(5L); + + final List<QueryChange> actual = new ArrayList<>(); + while (iter.hasNext()) { + final ChangeLogEntry<QueryChange> entry = iter.next(); + actual.add(entry.getEntry()); + } + assertEquals(expected, actual); + } + + @Test + public void readFromPosition_positionStartsEnd() throws Exception { + write10ChangesToChangeLog(); + + // set the position to some non-0 position + final TopicPartition partition = new TopicPartition(topic, 0); + consumer.assign(Lists.newArrayList(partition)); + consumer.seekToEnd(Lists.newArrayList(partition)); + final CloseableIteration<ChangeLogEntry<QueryChange>, QueryChangeLogException> iter = changeLog.readFromPosition(10L); + int count = 0; + while (iter.hasNext()) { + // should be empty + iter.next(); + count++; + } + assertEquals(0, count); + } + + private List<QueryChange> write10ChangesToChangeLog() throws Exception { + final List<QueryChange> changes = new ArrayList<>(); + for (int ii = 0; ii < 10; ii++) { + final String sparql = "SOME QUERY HERE_" + ii; + final UUID uuid = UUID.randomUUID(); + final QueryChange newChange = QueryChange.create(uuid, sparql); + changeLog.write(newChange); + changes.add(newChange); + } + return changes; + } +}