http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java new file mode 100644 index 0000000..bb438be --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/registration/KafkaNotificationRegistrationClient.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.registration; + +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.rya.periodic.notification.api.Notification; +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient; +import org.apache.rya.periodic.notification.notification.BasicNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; + +/** + * Implementation of {@link PeriodicNotificaitonClient} used to register new notification + * requests with the PeriodicQueryService. + * + */ +public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient { + + private KafkaProducer<String, CommandNotification> producer; + private String topic; + + public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) { + this.topic = topic; + this.producer = producer; + } + + @Override + public void addNotification(PeriodicNotification notification) { + processNotification(new CommandNotification(Command.ADD, notification)); + + } + + @Override + public void deleteNotification(BasicNotification notification) { + processNotification(new CommandNotification(Command.DELETE, notification)); + } + + @Override + public void deleteNotification(String notificationId) { + processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId))); + } + + @Override + public void addNotification(String id, long period, long delay, TimeUnit unit) { + Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build(); + processNotification(new CommandNotification(Command.ADD, notification)); + } + + + private void processNotification(CommandNotification notification) { + producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification)); + } + + @Override + public void close() { + producer.close(); + } + + +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java new file mode 100644 index 0000000..bd29d29 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.lang.reflect.Type; + +import org.apache.rya.periodic.notification.notification.BasicNotification; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * {@link TypeAdapter} for {@link BasicNotification}s. Used in {@link CommandNotificationTypeAdapter} to + * serialize {@link CommandNotification}s. + * + */ +public class BasicNotificationTypeAdapter implements JsonDeserializer<BasicNotification>, JsonSerializer<BasicNotification> { + + @Override + public JsonElement serialize(BasicNotification arg0, Type arg1, JsonSerializationContext arg2) { + JsonObject result = new JsonObject(); + result.add("id", new JsonPrimitive(arg0.getId())); + return result; + } + + @Override + public BasicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException { + JsonObject json = arg0.getAsJsonObject(); + String id = json.get("id").getAsString(); + return new BasicNotification(id); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java new file mode 100644 index 0000000..50180ad --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Joiner; +import com.google.common.primitives.Bytes; + +/** + * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming messages + * from Kafka. + * + */ +public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> { + + private static final Logger log = Logger.getLogger(BindingSetSerDe.class); + private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer(); + private static final byte[] DELIM_BYTE = "\u0002".getBytes(); + + private byte[] toBytes(BindingSet bindingSet) { + try { + return getBytes(getVarOrder(bindingSet), bindingSet); + } catch(Exception e) { + log.trace("Unable to serialize BindingSet: " + bindingSet); + return new byte[0]; + } + } + + private BindingSet fromBytes(byte[] bsBytes) { + try{ + int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE); + byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex); + byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length); + VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";")); + return getBindingSet(varOrder, bsBytesNoVarOrder); + } catch(Exception e) { + log.trace("Unable to deserialize BindingSet: " + bsBytes); + return new QueryBindingSet(); + } + } + + private VariableOrder getVarOrder(BindingSet bs) { + return new VariableOrder(bs.getBindingNames()); + } + + private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException { + byte[] bsBytes = serializer.convert(bs, varOrder); + String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders()); + byte[] varOrderBytes = varOrderString.getBytes("UTF-8"); + return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes); + } + + private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException { + return serializer.convert(bsBytes, varOrder); + } + + @Override + public BindingSet deserialize(String topic, byte[] bytes) { + return fromBytes(bytes); + } + + @Override + public void close() { + // Do nothing. Nothing to close. + } + + @Override + public void configure(Map<String, ?> arg0, boolean arg1) { + // Do nothing. Nothing to configure. + } + + @Override + public byte[] serialize(String topic, BindingSet bs) { + return toBytes(bs); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java new file mode 100644 index 0000000..302e1be --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.io.UnsupportedEncodingException; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.rya.periodic.notification.api.Notification; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s + * to and from Kafka. + * + */ +public class CommandNotificationSerializer implements Serializer<CommandNotification>, Deserializer<CommandNotification> { + + private static Gson gson = new GsonBuilder() + .registerTypeHierarchyAdapter(Notification.class, new CommandNotificationTypeAdapter()).create(); + private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class); + + @Override + public CommandNotification deserialize(String topic, byte[] bytes) { + String json = null; + try { + json = new String(bytes, "UTF-8"); + } catch (UnsupportedEncodingException e) { + LOG.info("Unable to deserialize notification for topic: " + topic); + } + return gson.fromJson(json, CommandNotification.class); + } + + @Override + public byte[] serialize(String topic, CommandNotification command) { + try { + return gson.toJson(command).getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + LOG.info("Unable to serialize notification: " + command + "for topic: " + topic); + throw new RuntimeException(e); + } + } + + @Override + public void close() { + // Do nothing. Nothing to close + } + + @Override + public void configure(Map<String, ?> arg0, boolean arg1) { + // Do nothing. Nothing to configure + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java new file mode 100644 index 0000000..a9fb7e1 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.lang.reflect.Type; + +import org.apache.rya.periodic.notification.api.Notification; +import org.apache.rya.periodic.notification.notification.BasicNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * {@link TypeAdapter} used to serialize and deserialize {@link CommandNotification}s. + * This TypeAdapter is used in {@link CommandNotificationSerializer} for producing and + * consuming messages to and from Kafka. + * + */ +public class CommandNotificationTypeAdapter + implements JsonDeserializer<CommandNotification>, JsonSerializer<CommandNotification> { + + @Override + public JsonElement serialize(CommandNotification arg0, Type arg1, JsonSerializationContext arg2) { + JsonObject result = new JsonObject(); + result.add("command", new JsonPrimitive(arg0.getCommand().name())); + Notification notification = arg0.getNotification(); + if (notification instanceof PeriodicNotification) { + result.add("type", new JsonPrimitive(PeriodicNotification.class.getSimpleName())); + PeriodicNotificationTypeAdapter adapter = new PeriodicNotificationTypeAdapter(); + result.add("notification", + adapter.serialize((PeriodicNotification) notification, PeriodicNotification.class, arg2)); + } else if (notification instanceof BasicNotification) { + result.add("type", new JsonPrimitive(BasicNotification.class.getSimpleName())); + BasicNotificationTypeAdapter adapter = new BasicNotificationTypeAdapter(); + result.add("notification", + adapter.serialize((BasicNotification) notification, BasicNotification.class, arg2)); + } else { + throw new IllegalArgumentException("Invalid notification type."); + } + return result; + } + + @Override + public CommandNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) + throws JsonParseException { + + JsonObject json = arg0.getAsJsonObject(); + Command command = Command.valueOf(json.get("command").getAsString()); + String type = json.get("type").getAsString(); + Notification notification = null; + if (type.equals(PeriodicNotification.class.getSimpleName())) { + notification = (new PeriodicNotificationTypeAdapter()).deserialize(json.get("notification"), + PeriodicNotification.class, arg2); + } else if (type.equals(BasicNotification.class.getSimpleName())) { + notification = (new BasicNotificationTypeAdapter()).deserialize(json.get("notification"), + BasicNotification.class, arg2); + } else { + throw new JsonParseException("Cannot deserialize Json"); + } + + return new CommandNotification(command, notification); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java new file mode 100644 index 0000000..fcc0ba2 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.api/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.lang.reflect.Type; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.apache.rya.periodic.notification.notification.PeriodicNotification.Builder; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * {@link TypeAdapter} used to serialize and deserialize {@link PeriodicNotification}s. + * This TypeAdapter is used in {@link CommandNotificationTypeAdapter} which is used in + * {@link CommandNotificationSerializer} for producing and consuming messages to and from + * Kafka. + * + */ +public class PeriodicNotificationTypeAdapter + implements JsonSerializer<PeriodicNotification>, JsonDeserializer<PeriodicNotification> { + + @Override + public PeriodicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) + throws JsonParseException { + + JsonObject json = arg0.getAsJsonObject(); + String id = json.get("id").getAsString(); + long period = json.get("period").getAsLong(); + TimeUnit periodTimeUnit = TimeUnit.valueOf(json.get("timeUnit").getAsString()); + long initialDelay = json.get("initialDelay").getAsLong(); + Builder builder = PeriodicNotification.builder().id(id).period(period) + .initialDelay(initialDelay).timeUnit(periodTimeUnit); + + return builder.build(); + } + + @Override + public JsonElement serialize(PeriodicNotification arg0, Type arg1, JsonSerializationContext arg2) { + + JsonObject result = new JsonObject(); + result.add("id", new JsonPrimitive(arg0.getId())); + result.add("period", new JsonPrimitive(arg0.getPeriod())); + result.add("initialDelay", new JsonPrimitive(arg0.getInitialDelay())); + result.add("timeUnit", new JsonPrimitive(arg0.getTimeUnit().name())); + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml index 20a0647..402f81d 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml +++ b/extras/rya.periodic.service/periodic.service.integration.tests/pom.xml @@ -1,22 +1,14 @@ <?xml version="1.0" encoding="utf-8"?> -<!-- -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. ---> +<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor + license agreements. See the NOTICE file distributed with this work for additional + information regarding copyright ownership. The ASF licenses this file to + you under the Apache License, Version 2.0 (the "License"); you may not use + this file except in compliance with the License. You may obtain a copy of + the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required + by applicable law or agreed to in writing, software distributed under the + License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS + OF ANY KIND, either express or implied. See the License for the specific + language governing permissions and limitations under the License. --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> @@ -54,7 +46,6 @@ under the License. <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.periodic.service.notification</artifactId> - <version>${project.version}</version> <exclusions> <exclusion> <artifactId>logback-classic</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java index cb7557c..9109775 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationIT.java @@ -21,7 +21,6 @@ package org.apache.rya.periodic.notification.application; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Files; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.ArrayList; @@ -34,15 +33,16 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; +import java.util.UUID; import javax.xml.datatype.DatatypeConfigurationException; import javax.xml.datatype.DatatypeFactory; -import org.I0Itec.zkclient.ZkClient; import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.config.FluoConfiguration; import org.apache.fluo.core.client.FluoClientImpl; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -52,21 +52,27 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.kafka.base.EmbeddedKafkaInstance; +import org.apache.rya.kafka.base.EmbeddedKafkaSingleton; +import org.apache.rya.kafka.base.KafkaTestInstanceRule; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; -import org.apache.rya.periodic.notification.api.CreatePeriodicQuery; import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.apache.rya.periodic.notification.registration.kafka.KafkaNotificationRegistrationClient; +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.Value; @@ -81,14 +87,9 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import kafka.server.KafkaConfig; -import kafka.server.KafkaServer; -import kafka.utils.MockTime; -import kafka.utils.TestUtils; -import kafka.utils.Time; -import kafka.utils.ZKStringSerializer$; -import kafka.utils.ZkUtils; -import kafka.zk.EmbeddedZookeeper; +import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.NOTIFICATION_TOPIC; +import static org.apache.rya.periodic.notification.application.PeriodicNotificationApplicationConfiguration.KAFKA_BOOTSTRAP_SERVERS;; + public class PeriodicNotificationApplicationIT extends RyaExportITBase { @@ -97,45 +98,38 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { private KafkaProducer<String, CommandNotification> producer; private Properties props; private Properties kafkaProps; - PeriodicNotificationApplicationConfiguration conf; + private PeriodicNotificationApplicationConfiguration conf; + private static EmbeddedKafkaInstance embeddedKafka = EmbeddedKafkaSingleton.getInstance(); + private static String bootstrapServers; + + @Rule + public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false); - private static final String ZKHOST = "127.0.0.1"; - private static final String BROKERHOST = "127.0.0.1"; - private static final String BROKERPORT = "9092"; - private ZkUtils zkUtils; - private KafkaServer kafkaServer; - private EmbeddedZookeeper zkServer; - private ZkClient zkClient; + @BeforeClass + public static void initClass() { + bootstrapServers = embeddedKafka.createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + } @Before public void init() throws Exception { - setUpKafka(); + String topic = rule.getKafkaTopicName(); + rule.createTopic(topic); + + //get user specified props and update with the embedded kafka bootstrap servers and rule generated topic props = getProps(); + props.setProperty(NOTIFICATION_TOPIC, topic); + props.setProperty(KAFKA_BOOTSTRAP_SERVERS, bootstrapServers); conf = new PeriodicNotificationApplicationConfiguration(props); + + //create Kafka Producer kafkaProps = getKafkaProperties(conf); - app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props); producer = new KafkaProducer<>(kafkaProps, new StringSerializer(), new CommandNotificationSerializer()); + + //extract kafka specific properties from application config + app = PeriodicNotificationApplicationFactory.getPeriodicApplication(props); registrar = new KafkaNotificationRegistrationClient(conf.getNotificationTopic(), producer); } - private void setUpKafka() throws Exception { - // Setup Kafka. - zkServer = new EmbeddedZookeeper(); - final String zkConnect = ZKHOST + ":" + zkServer.port(); - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$); - zkUtils = ZkUtils.apply(zkClient, false); - - // setup Brokersparql - final Properties brokerProps = new Properties(); - brokerProps.setProperty("zookeeper.connect", zkConnect); - brokerProps.setProperty("broker.id", "0"); - brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString()); - brokerProps.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT); - final KafkaConfig config = new KafkaConfig(brokerProps); - final Time mock = new MockTime(); - kafkaServer = TestUtils.createServer(config, mock); - } - @Test public void periodicApplicationWithAggAndGroupByTest() throws Exception { @@ -185,10 +179,10 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { Connector connector = ConfigUtils.getConnector(conf); PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); - String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar); + String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); addData(statements); app.start(); -// + Multimap<Long, BindingSet> actual = HashMultimap.create(); try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { consumer.subscribe(Arrays.asList(id)); @@ -321,10 +315,10 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { Connector connector = ConfigUtils.getConnector(conf); PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); - String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar); + String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); addData(statements); app.start(); -// + Multimap<Long, BindingSet> expected = HashMultimap.create(); try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { consumer.subscribe(Arrays.asList(id)); @@ -411,10 +405,10 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { Connector connector = ConfigUtils.getConnector(conf); PeriodicQueryResultStorage storage = new AccumuloPeriodicQueryResultStorage(connector, conf.getTablePrefix()); CreatePeriodicQuery periodicQuery = new CreatePeriodicQuery(fluo, storage); - String id = periodicQuery.createQueryAndRegisterWithKafka(sparql, registrar); + String id = FluoQueryUtils.convertFluoQueryIdToPcjId(periodicQuery.createPeriodicQuery(sparql, registrar).getQueryId()); addData(statements); app.start(); -// + Multimap<Long, BindingSet> expected = HashMultimap.create(); try (KafkaConsumer<String, BindingSet> consumer = new KafkaConsumer<>(kafkaProps, new StringDeserializer(), new BindingSetSerDe())) { consumer.subscribe(Arrays.asList(id)); @@ -458,13 +452,6 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { public void shutdown() { registrar.close(); app.stop(); - teardownKafka(); - } - - private void teardownKafka() { - kafkaServer.shutdown(); - zkClient.close(); - zkServer.shutdown(); } private void addData(Collection<Statement> statements) throws DatatypeConfigurationException { @@ -473,20 +460,17 @@ public class PeriodicNotificationApplicationIT extends RyaExportITBase { InsertTriples inserter = new InsertTriples(); statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x))); getMiniFluo().waitForObservers(); -// FluoITHelper.printFluoTable(fluo); } - } - private Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { + private static Properties getKafkaProperties(PeriodicNotificationApplicationConfiguration conf) { Properties kafkaProps = new Properties(); - kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, conf.getBootStrapServers()); - kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, conf.getNotificationClientId()); + kafkaProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + kafkaProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, conf.getNotificationGroupId()); kafkaProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); return kafkaProps; } - private Properties getProps() throws IOException { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java index cf24974..e05ca6f 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/application/PeriodicNotificationProviderIT.java @@ -31,10 +31,11 @@ import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; import org.apache.rya.periodic.notification.notification.TimestampedNotification; import org.apache.rya.periodic.notification.recovery.PeriodicNotificationProvider; +import org.junit.Assert; import org.junit.Test; import org.openrdf.query.MalformedQueryException; -import org.junit.Assert; +import com.google.common.collect.Sets; public class PeriodicNotificationProviderIT extends AccumuloExportITBase { @@ -55,7 +56,7 @@ public class PeriodicNotificationProviderIT extends AccumuloExportITBase { String id = null; try(FluoClient fluo = new FluoClientImpl(getFluoConfiguration())) { - id = pcj.createPcj(sparql, fluo).getQueryId(); + id = pcj.createPcj(FluoQueryUtils.createNewPcjId(), sparql, Sets.newHashSet(), fluo).getQueryId(); provider.processRegisteredNotifications(coord, fluo.newSnapshot()); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java index c5dc809..874e7e2 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/exporter/PeriodicNotificationExporterIT.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.kafka.base.KafkaITBase; import org.apache.rya.kafka.base.KafkaTestInstanceRule; +import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.serialization.BindingSetSerDe; import org.junit.Assert; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java index fa60e48..21109ae 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/processor/PeriodicNotificationProcessorIT.java @@ -30,8 +30,8 @@ import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.NodeBin; -import org.apache.rya.periodic.notification.exporter.BindingSetRecord; import org.apache.rya.periodic.notification.notification.PeriodicNotification; import org.apache.rya.periodic.notification.notification.TimestampedNotification; import org.junit.Assert; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java index bb98b7f..830fa46 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/pruner/PeriodicNotificationBinPrunerIT.java @@ -39,9 +39,11 @@ import org.apache.fluo.api.data.ColumnValue; import org.apache.fluo.api.data.Span; import org.apache.fluo.core.client.FluoClientImpl; import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; @@ -50,9 +52,7 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableItera import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.pcj.fluo.test.base.RyaExportITBase; -import org.apache.rya.periodic.notification.api.CreatePeriodicQuery; import org.apache.rya.periodic.notification.api.NodeBin; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; import org.junit.Assert; import org.junit.Test; import org.openrdf.model.Statement; @@ -85,8 +85,7 @@ public class PeriodicNotificationBinPrunerIT extends RyaExportITBase { PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(), getRyaInstanceName()); CreatePeriodicQuery createPeriodicQuery = new CreatePeriodicQuery(fluo, periodicStorage); - PeriodicNotification notification = createPeriodicQuery.createPeriodicQuery(sparql); - String queryId = notification.getId(); + String queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(createPeriodicQuery.createPeriodicQuery(sparql).getQueryId()); // create statements to ingest into Fluo final ValueFactory vf = new ValueFactoryImpl(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java index bde406f..522e69d 100644 --- a/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java +++ b/extras/rya.periodic.service/periodic.service.integration.tests/src/test/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicCommandNotificationConsumerIT.java @@ -18,31 +18,44 @@ */package org.apache.rya.periodic.notification.registration.kafka; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.log4j.BasicConfigurator; -import org.apache.rya.pcj.fluo.test.base.KafkaExportITBase; +import org.apache.rya.kafka.base.KafkaITBase; +import org.apache.rya.kafka.base.KafkaTestInstanceRule; import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; import org.apache.rya.periodic.notification.notification.CommandNotification; import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; -import org.junit.After; import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase { +public class PeriodicCommandNotificationConsumerIT extends KafkaITBase { - private static final String topic = "topic"; private KafkaNotificationRegistrationClient registration; private PeriodicNotificationCoordinatorExecutor coord; private KafkaNotificationProvider provider; + private String bootstrapServer; + + @Rule + public KafkaTestInstanceRule rule = new KafkaTestInstanceRule(false); + + @Before + public void init() throws Exception { + bootstrapServer = createBootstrapServerConfig().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG); + } @Test public void kafkaNotificationProviderTest() throws InterruptedException { @@ -52,6 +65,9 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase { BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); Properties props = createKafkaConfig(); KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); + String topic = rule.getKafkaTopicName(); + rule.createTopic(topic); + registration = new KafkaNotificationRegistrationClient(topic, producer); coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); @@ -80,6 +96,9 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase { BlockingQueue<TimestampedNotification> notifications = new LinkedBlockingQueue<>(); Properties props = createKafkaConfig(); KafkaProducer<String, CommandNotification> producer = new KafkaProducer<>(props); + String topic = rule.getKafkaTopicName(); + rule.createTopic(topic); + registration = new KafkaNotificationRegistrationClient(topic, producer); coord = new PeriodicNotificationCoordinatorExecutor(1, notifications); provider = new KafkaNotificationProvider(topic, new StringDeserializer(), new CommandNotificationSerializer(), props, coord, 1); @@ -108,8 +127,8 @@ public class PeriodicCommandNotificationConsumerIT extends KafkaExportITBase { private Properties createKafkaConfig() { Properties props = new Properties(); - props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); - props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group0"); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "consumer0"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/pom.xml b/extras/rya.periodic.service/periodic.service.notification/pom.xml index 2173888..1e59e15 100644 --- a/extras/rya.periodic.service/periodic.service.notification/pom.xml +++ b/extras/rya.periodic.service/periodic.service.notification/pom.xml @@ -1,107 +1,112 @@ <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <!-- Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with this - work for additional information regarding copyright ownership. The ASF licenses - this file to you under the Apache License, Version 2.0 (the "License"); you - may not use this file except in compliance with the License. You may obtain - a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless - required by applicable law or agreed to in writing, software distributed - under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES - OR CONDITIONS OF ANY KIND, either express or implied. See the License for - the specific language governing permissions and limitations under the License. --> - <parent> - <groupId>org.apache.rya</groupId> - <artifactId>rya.periodic.service</artifactId> - <version>3.2.11-incubating-SNAPSHOT</version> - </parent> + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <!-- Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with this + work for additional information regarding copyright ownership. The ASF licenses + this file to you under the Apache License, Version 2.0 (the "License"); you + may not use this file except in compliance with the License. You may obtain + a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless + required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. --> + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> - <artifactId>rya.periodic.service.notification</artifactId> - - <name>Apache Rya Periodic Service Notification</name> + <artifactId>rya.periodic.service.notification</artifactId> + + <name>Apache Rya Periodic Service Notification</name> <description>Notifications for Rya Periodic Service</description> - <dependencies> + <dependencies> + + <dependency> + <groupId>org.apache.twill</groupId> + <artifactId>twill-api</artifactId> + <version>0.11.0</version> + </dependency> + <dependency> + <groupId>org.apache.twill</groupId> + <artifactId>twill-yarn</artifactId> + <version>0.11.0</version> + <exclusions> + <exclusion> + <artifactId>kafka_2.10</artifactId> + <groupId>org.apache.kafka</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.0</version> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.fluo</groupId> + <artifactId>fluo-core</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + </dependency> + <dependency> + <groupId>org.openrdf.sesame</groupId> + <artifactId>sesame-query</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing.pcj</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.pcj.fluo.app</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service.api</artifactId> + </dependency> - <dependency> - <groupId>org.apache.twill</groupId> - <artifactId>twill-api</artifactId> - <version>0.11.0</version> - </dependency> - <dependency> - <groupId>org.apache.twill</groupId> - <artifactId>twill-yarn</artifactId> - <version>0.11.0</version> - <exclusions> - <exclusion> - <artifactId>kafka_2.10</artifactId> - <groupId>org.apache.kafka</groupId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.google.code.gson</groupId> - <artifactId>gson</artifactId> - <version>2.8.0</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-api</artifactId> - </dependency> - <dependency> - <groupId>org.apache.fluo</groupId> - <artifactId>fluo-core</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.indexing</artifactId> - </dependency> - <dependency> - <groupId>org.openrdf.sesame</groupId> - <artifactId>sesame-query</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.indexing.pcj</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rya</groupId> - <artifactId>rya.pcj.fluo.app</artifactId> - </dependency> - </dependencies> + </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-compiler-plugin</artifactId> - <configuration> - <encoding>UTF-8</encoding> - <source>1.8</source> - <target>1.8</target> - </configuration> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-shade-plugin</artifactId> - <version>3.0.0</version> - <executions> - <execution> - <phase>package</phase> - <goals> - <goal>shade</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <encoding>UTF-8</encoding> + <source>1.8</source> + <target>1.8</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.0.0</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java deleted file mode 100644 index 571ee1c..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BinPruner.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.openrdf.query.Binding; - -/** - * Object that cleans up old {@link BindingSet}s corresponding to the specified - * {@link NodeBin}. This class deletes all BindingSets with the bin - * indicated by {@link NodeBin#getBin()}. A BindingSet corresponds to a given - * bin if it contains a {@link Binding} with name {@link IncrementalUpdateConstants#PERIODIC_BIN_ID} - * and value equal to the given bin. - * - */ -public interface BinPruner { - - /** - * Cleans up all {@link BindingSet}s associated with the indicated {@link NodeBin}. - * @param bin - NodeBin that indicates which BindingSets to delete.. - */ - public void pruneBindingSetBin(NodeBin bin); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java deleted file mode 100644 index 500a435..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/BindingSetExporter.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; -import org.apache.rya.periodic.notification.exporter.BindingSetRecord; - -/** - * An Object that is used to export {@link BindingSet}s to an external repository or queuing system. - * - */ -public interface BindingSetExporter { - - /** - * This method exports the BindingSet to the external repository or queuing system - * that this BindingSetExporter is configured to export to. - * @param bindingSet - {@link BindingSet} to be exported - * @throws ResultExportException - */ - public void exportNotification(BindingSetRecord bindingSet) throws ResultExportException; - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java deleted file mode 100644 index 60a3e7c..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/CreatePeriodicQuery.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import java.util.Optional; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.rya.api.client.CreatePCJ.ExportStrategy; -import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; -import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode; -import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; -import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; -import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; -import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; -import org.apache.rya.periodic.notification.application.PeriodicNotificationApplication; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.algebra.evaluation.function.Function; - -import com.google.common.collect.Sets; - -/** - * Object that creates a Periodic Query. A Periodic Query is any query - * requesting periodic updates about events that occurred within a given - * window of time of this instant. This is also known as a rolling window - * query. Period Queries can be expressed using SPARQL by including the - * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI} - * in the query. The user must provide this Function with the following arguments: - * the temporal variable in the query that will be filtered on, the window of time - * that events must occur within, the period at which the user wants to receive updates, - * and the time unit. The following query requests all observations that occurred - * within the last minute and requests updates every 15 seconds. It also performs - * a count on those observations. - * <li> - * <li> prefix function: http://org.apache.rya/function# - * <li> "prefix time: http://www.w3.org/2006/time# - * <li> "select (count(?obs) as ?total) where { - * <li> "Filter(function:periodic(?time, 1, .25, time:minutes)) - * <li> "?obs uri:hasTime ?time. - * <li> "?obs uri:hasId ?id } - * <li> - * - * This class is responsible for taking a Periodic Query expressed as a SPARQL query - * and adding to Fluo and Kafka so that it can be processed by the {@link PeriodicNotificationApplication}. - */ -public class CreatePeriodicQuery { - - private FluoClient fluoClient; - private PeriodicQueryResultStorage periodicStorage; - Function funciton; - PeriodicQueryUtil util; - - - public CreatePeriodicQuery(FluoClient fluoClient, PeriodicQueryResultStorage periodicStorage) { - this.fluoClient = fluoClient; - this.periodicStorage = periodicStorage; - } - - /** - * Creates a Periodic Query by adding the query to Fluo and using the resulting - * Fluo id to create a {@link PeriodicQueryResultStorage} table. - * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table - * @return PeriodicNotification that can be used to register register this query with the {@link PeriodicNotificationApplication}. - */ - public PeriodicNotification createPeriodicQuery(String sparql) { - try { - Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql); - if(optNode.isPresent()) { - PeriodicQueryNode periodicNode = optNode.get(); - String pcjId = FluoQueryUtils.createNewPcjId(); - - //register query with Fluo - CreateFluoPcj createPcj = new CreateFluoPcj(); - createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluoClient); - - //register query with PeriodicResultStorage table - periodicStorage.createPeriodicQuery(pcjId, sparql); - //create notification - PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod()) - .timeUnit(periodicNode.getUnit()).build(); - return notification; - } else { - throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter."); - } - } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) { - throw new RuntimeException(e); - } - } - - /** - * Creates a Periodic Query by adding the query to Fluo and using the resulting - * Fluo id to create a {@link PeriodicQueryResultStorage} table. In addition, this - * method registers the PeriodicQuery with the PeriodicNotificationApplication to poll - * the PeriodicQueryResultStorage table at regular intervals and export results to Kafka. - * The PeriodicNotificationApp queries the result table at a regular interval indicated by the Period of - * the PeriodicQuery. - * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table - * @param PeriodicNotificationClient - registers the PeriodicQuery with the {@link PeriodicNotificationApplication} - * @return id of the PeriodicQuery and PeriodicQueryResultStorage table (these are the same) - */ - public String createQueryAndRegisterWithKafka(String sparql, PeriodicNotificationClient periodicClient) { - PeriodicNotification notification = createPeriodicQuery(sparql); - periodicClient.addNotification(notification); - return notification.getId(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java deleted file mode 100644 index b1e8bad..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/LifeCycle.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -/** - * Interface providing basic life cycle functionality, - * including stopping and starting any class implementing this - * interface and checking whether is it running. - * - */ -public interface LifeCycle { - - /** - * Starts a running application. - */ - public void start(); - - /** - * Stops a running application. - */ - public void stop(); - - /** - * Determine if application is currently running. - * @return true if application is running and false otherwise. - */ - public boolean currentlyRunning(); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java deleted file mode 100644 index 3ed7979..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NodeBin.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import java.util.Objects; - -/** - * Object used to indicate the id of a given Periodic Query - * along with a particular bin of results. This Object is used - * by the {@link BinPruner} to clean up old query results after - * they have been processed. - * - */ -public class NodeBin { - - private long bin; - private String nodeId; - - public NodeBin(String nodeId, long bin) { - this.bin = bin; - this.nodeId = nodeId; - } - - /** - * @return id of Periodic Query - */ - public String getNodeId() { - return nodeId; - } -/** - * @return bin id of results for a given Periodic Query - */ - public long getBin() { - return bin; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other instanceof NodeBin) { - NodeBin bin = (NodeBin) other; - return this.bin == bin.bin && this.nodeId.equals(bin.nodeId); - } - - return false; - } - - @Override - public int hashCode() { - return Objects.hash(bin, nodeId); - } - - @Override - public String toString() { - return new StringBuilder().append("Node Bin \n").append(" QueryId: " + nodeId + "\n").append(" Bin: " + bin + "\n").toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java deleted file mode 100644 index 3e9e0d1..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/Notification.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -/** - * Notification Object used by the Periodic Query Service - * to inform workers to process results for a given Periodic - * Query with the indicated id. - * - */ -public interface Notification { - - /** - * @return id of a Periodic Query - */ - public String getId(); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java deleted file mode 100644 index d53dc17..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationCoordinatorExecutor.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import java.util.concurrent.ScheduledExecutorService; - -import org.apache.rya.periodic.notification.notification.CommandNotification; - -/** - * Object that manages the periodic notifications for the Periodic Query Service. - * This Object processes new requests for periodic updates by registering them with - * some sort of service that generates periodic updates (such as a {@link ScheduledExecutorService}). - * - */ -public interface NotificationCoordinatorExecutor extends LifeCycle { - - /** - * Registers or deletes a {@link CommandNotification}s with the periodic service to - * generate notifications at a regular interval indicated by the CommandNotification. - * @param notification - CommandNotification to be registered or deleted from the periodic update - * service. - */ - public void processNextCommandNotification(CommandNotification notification); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java deleted file mode 100644 index 4ac9089..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/NotificationProcessor.java +++ /dev/null @@ -1,41 +0,0 @@ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import org.apache.rya.periodic.notification.notification.TimestampedNotification; - -/** - * Object that processes new {@link TimestampedNotification}s generated by {@link NotificationCoordinatorExecutor}. - * It is expected that the NotificationCoordinatorExecutor will this Object with notifications to perform work via some sort - * sort of queuing service such as a BlockingQueue or Kafka. This Object processes the notifications by retrieving - * query results associated with the Periodic Query id given by {@link TimestampedNotification#getId()}, parsing them - * and then providing them to another service to be exported. - * - */ -public interface NotificationProcessor { - - /** - * Processes {@link TimestampedNotification}s by retrieving the Periodic Query results - * associated the query id given by {@link TimestampedNotification#getId()}. - * @param notification - contains information about which query results to retrieve - */ - public void processNotification(TimestampedNotification notification); - -}
