Repository: incubator-rya Updated Branches: refs/heads/master ad6ab0185 -> 63f87b868
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java deleted file mode 100644 index ff08733..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/api/PeriodicNotificationClient.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.api; - -import java.util.concurrent.TimeUnit; - -import org.apache.rya.periodic.notification.notification.BasicNotification; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; - -/** - * Object to register {@link PeriodicNotification}s with an external queuing - * service to be handled by a {@link NotificationCoordinatorExecutor} service. - * The service will generate notifications to process Periodic Query results at regular - * intervals corresponding the period of the PeriodicNotification. - * - */ -public interface PeriodicNotificationClient extends AutoCloseable { - - /** - * Adds a new notification to be registered with the {@link NotificationCoordinatorExecutor} - * @param notification - notification to be added - */ - public void addNotification(PeriodicNotification notification); - - /** - * Deletes a notification from the {@link NotificationCoordinatorExecutor}. - * @param notification - notification to be deleted - */ - public void deleteNotification(BasicNotification notification); - - /** - * Deletes a notification from the {@link NotificationCoordinatorExecutor}. - * @param notification - id corresponding to the notification to be deleted - */ - public void deleteNotification(String notificationId); - - /** - * Adds a new notification with the indicated id and period to the {@link NotificationCoordinatorExecutor} - * @param id - Periodic Query id - * @param period - period indicating frequency at which notifications will be generated - * @param delay - initial delay for starting periodic notifications - * @param unit - time unit of delay and period - */ - public void addNotification(String id, long period, long delay, TimeUnit unit); - - public void close(); - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java index 6dd7126..92a7d18 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplication.java @@ -21,10 +21,10 @@ package org.apache.rya.periodic.notification.application; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.LifeCycle; import org.apache.rya.periodic.notification.api.NodeBin; import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.exporter.BindingSetRecord; import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor; import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor; import org.apache.rya.periodic.notification.pruner.PeriodicQueryPrunerExecutor; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java index 248b2bf..771a4ab 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/application/PeriodicNotificationApplicationFactory.java @@ -38,10 +38,10 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.rya.indexing.pcj.fluo.app.util.FluoClientFactory; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.NodeBin; import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; -import org.apache.rya.periodic.notification.exporter.BindingSetRecord; import org.apache.rya.periodic.notification.exporter.KafkaExporterExecutor; import org.apache.rya.periodic.notification.notification.TimestampedNotification; import org.apache.rya.periodic.notification.processor.NotificationProcessorExecutor; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/BindingSetRecord.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/BindingSetRecord.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/BindingSetRecord.java deleted file mode 100644 index 471b021..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/BindingSetRecord.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.exporter; - -import org.openrdf.query.BindingSet; - -import com.google.common.base.Objects; - -/** - * Object that associates a {@link BindingSet} with a given Kafka topic. - * This ensures that the {@link KafkaPeriodicBindingSetExporter} can export - * each BindingSet to its appropriate topic. - * - */ -public class BindingSetRecord { - - private BindingSet bs; - private String topic; - - public BindingSetRecord(BindingSet bs, String topic) { - this.bs = bs; - this.topic = topic; - } - - /** - * @return BindingSet in this BindingSetRecord - */ - public BindingSet getBindingSet() { - return bs; - } - - /** - * @return Kafka topic for this BindingSetRecord - */ - public String getTopic() { - return topic; - } - - @Override - public boolean equals(Object o) { - if(this == o) { - return true; - } - - if(o instanceof BindingSetRecord) { - BindingSetRecord record = (BindingSetRecord) o; - return Objects.equal(this.bs, record.bs)&&Objects.equal(this.topic,record.topic); - } - - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(bs, topic); - } - - @Override - public String toString() { - return new StringBuilder().append("Binding Set Record \n").append(" Topic: " + topic + "\n").append(" BindingSet: " + bs + "\n") - .toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java index 4880015..c2e5ebf 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaExporterExecutor.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.log4j.Logger; import org.apache.rya.periodic.notification.api.BindingSetExporter; +import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.LifeCycle; import org.openrdf.query.BindingSet; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java index 9baede3..8a0322f 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/exporter/KafkaPeriodicBindingSetExporter.java @@ -30,8 +30,9 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; import org.apache.rya.periodic.notification.api.BindingSetExporter; +import org.apache.rya.periodic.notification.api.BindingSetRecord; +import org.apache.rya.periodic.notification.api.BindingSetRecordExportException; import org.openrdf.model.Literal; import org.openrdf.query.BindingSet; @@ -64,7 +65,7 @@ public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runn * the indicated BindingSetRecord and the BindingSet is then exported to the topic. */ @Override - public void exportNotification(BindingSetRecord record) throws ResultExportException { + public void exportNotification(BindingSetRecord record) throws BindingSetRecordExportException { String bindingName = IncrementalUpdateConstants.PERIODIC_BIN_ID; BindingSet bindingSet = record.getBindingSet(); String topic = record.getTopic(); @@ -75,7 +76,7 @@ public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runn //wait for confirmation that results have been received future.get(5, TimeUnit.SECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new ResultExportException(e.getMessage()); + throw new BindingSetRecordExportException(e.getMessage()); } } @@ -85,7 +86,7 @@ public class KafkaPeriodicBindingSetExporter implements BindingSetExporter, Runn while (!closed.get()) { exportNotification(bindingSets.take()); } - } catch (InterruptedException | ResultExportException e) { + } catch (InterruptedException | BindingSetRecordExportException e) { log.trace("Thread " + threadNumber + " is unable to process message."); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java deleted file mode 100644 index c31a5c0..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/BasicNotification.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.notification; - -import org.apache.rya.periodic.notification.api.Notification; - -import com.google.common.base.Objects; - -/** - * Notification Object used by the Periodic Query Service - * to inform workers to process results for a given Periodic - * Query with the indicated id. - * - */ -public class BasicNotification implements Notification { - - private String id; - - /** - * Creates a BasicNotification - * @param id - Fluo query id associated with this Notification - */ - public BasicNotification(String id) { - this.id = id; - } - - /** - * @return the Fluo Query Id that this notification will generate results for - */ - @Override - public String getId() { - return id; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (other instanceof BasicNotification) { - BasicNotification not = (BasicNotification) other; - return Objects.equal(this.id, not.id); - } - - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(id); - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - return builder.append("id").append("=").append(id).toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java deleted file mode 100644 index 597b228..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/CommandNotification.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.notification; - -import org.apache.rya.periodic.notification.api.Notification; - -import com.google.common.base.Objects; -import com.google.common.base.Preconditions; - -/** - * This Object contains a Notification Object used by the Periodic Query Service - * to inform workers to process results for a given Periodic Query with the - * indicated id. Additionally, the CommandNotification contains a - * {@link Command} about which action the - * {@link NotificationCoordinatorExecutor} should take (adding or deleting). - * CommandNotifications are meant to be added to an external work queue (such as - * Kafka) to be processed by the NotificationCoordinatorExecutor. - * - */ -public class CommandNotification implements Notification { - - private Notification notification; - private Command command; - - public enum Command { - ADD, DELETE - }; - - /** - * Creates a new CommandNotification - * @param command - the command associated with this notification (either add, update, or delete) - * @param notification - the underlying notification associated with this command - */ - public CommandNotification(Command command, Notification notification) { - this.notification = Preconditions.checkNotNull(notification); - this.command = Preconditions.checkNotNull(command); - } - - @Override - public String getId() { - return notification.getId(); - } - - /** - * Returns {@link Notification} contained by this CommmandNotification. - * @return - Notification contained by this Object - */ - public Notification getNotification() { - return this.notification; - } - - /** - * @return Command contained by this Object (either add or delete) - */ - public Command getCommand() { - return this.command; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (other instanceof CommandNotification) { - CommandNotification cn = (CommandNotification) other; - return Objects.equal(this.command, cn.command) && Objects.equal(this.notification, cn.notification); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Objects.hashCode(command, notification); - } - - @Override - public String toString() { - return new StringBuilder().append("command").append("=").append(command.toString()).append(";") - .append(notification.toString()).toString(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java deleted file mode 100644 index aa9e581..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/PeriodicNotification.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.notification; - -import java.util.Objects; -import java.util.concurrent.TimeUnit; - -import org.apache.rya.periodic.notification.api.Notification; - -import com.google.common.base.Preconditions; - -/** - * Notification Object used by the Periodic Query Service to inform workers to - * process results for a given Periodic Query with the indicated id. - * Additionally, this Object contains a period that indicates a frequency at - * which regular updates are generated. - * - */ -public class PeriodicNotification implements Notification { - - private String id; - private long period; - private TimeUnit periodTimeUnit; - private long initialDelay; - - /** - * Creates a PeriodicNotification. - * @param id - Fluo Query Id that this notification is associated with - * @param period - period at which notifications are generated - * @param periodTimeUnit - time unit associated with the period and delay - * @param initialDelay - amount of time to wait before generating the first notification - */ - public PeriodicNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { - this.id = Preconditions.checkNotNull(id); - this.periodTimeUnit = Preconditions.checkNotNull(periodTimeUnit); - Preconditions.checkArgument(period > 0 && initialDelay >= 0); - this.period = period; - this.initialDelay = initialDelay; - } - - - /** - * Create a PeriodicNotification - * @param other - other PeriodicNotification used in copy constructor - */ - public PeriodicNotification(PeriodicNotification other) { - this(other.id, other.period, other.periodTimeUnit, other.initialDelay); - } - - public String getId() { - return id; - } - - /** - * @return - period at which regular notifications are generated - */ - public long getPeriod() { - return period; - } - - /** - * @return time unit of period and initial delay - */ - public TimeUnit getTimeUnit() { - return periodTimeUnit; - } - - /** - * @return amount of time to delay before beginning to generate notifications - */ - public long getInitialDelay() { - return initialDelay; - } - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - String delim = "="; - String delim2 = ";"; - return builder.append("id").append(delim).append(id).append(delim2).append("period").append(delim).append(period).append(delim2) - .append("periodTimeUnit").append(delim).append(periodTimeUnit).append(delim2).append("initialDelay").append(delim) - .append(initialDelay).toString(); - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - - if (!(other instanceof PeriodicNotification)) { - return false; - } - - PeriodicNotification notification = (PeriodicNotification) other; - return Objects.equals(this.id, notification.id) && (this.period == notification.period) - && Objects.equals(this.periodTimeUnit, notification.periodTimeUnit) && (this.initialDelay == notification.initialDelay); - } - - @Override - public int hashCode() { - return Objects.hash(id, period, periodTimeUnit, initialDelay); - } - - public static Builder builder() { - return new Builder(); - } - - public static class Builder { - - private String id; - private long period; - private TimeUnit periodTimeUnit; - private long initialDelay = 0; - - /** - * @param id - periodic query id - * @return - builder to chain method calls - */ - public Builder id(String id) { - this.id = id; - return this; - } - - /** - * @param period of the periodic notification for generating regular notifications - * @return - builder to chain method calls - */ - public Builder period(long period) { - this.period = period; - return this; - } - - /** - * @param timeUnit of period and initial delay - * @return - builder to chain method calls - */ - public Builder timeUnit(TimeUnit timeUnit) { - this.periodTimeUnit = timeUnit; - return this; - } - - /** - * @param initialDelay - amount of time to wait before generating notifications - * @return - builder to chain method calls - */ - public Builder initialDelay(long initialDelay) { - this.initialDelay = initialDelay; - return this; - } - - /** - * Builds PeriodicNotification - * @return PeriodicNotification constructed from Builder specified parameters - */ - public PeriodicNotification build() { - return new PeriodicNotification(id, period, periodTimeUnit, initialDelay); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java deleted file mode 100644 index 38073ce..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/notification/TimestampedNotification.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.notification; - -import java.util.Date; -import java.util.concurrent.TimeUnit; - -/** - * {@link PeriodicNotification} Object used by the Periodic Query Service to inform workers to - * process results for a given Periodic Query with the indicated id. Additionally - * this Object contains a {@link Date} object to indicate the date time at which this - * notification was generated. - * - */ -public class TimestampedNotification extends PeriodicNotification { - - private Date date; - - /** - * Constructs a TimestampedNotification - * @param id - Fluo Query Id associated with this Notification - * @param period - period at which notifications are generated - * @param periodTimeUnit - time unit associated with period and initial delay - * @param initialDelay - amount of time to wait before generating first notification - */ - public TimestampedNotification(String id, long period, TimeUnit periodTimeUnit, long initialDelay) { - super(id, period, periodTimeUnit, initialDelay); - date = new Date(); - } - - /** - * Creates a TimestampedNotification - * @param notification - PeriodicNotification used to create this TimestampedNotification. - * This constructor creates a time stamp for the TimestampedNotification. - */ - public TimestampedNotification(PeriodicNotification notification) { - super(notification); - date = new Date(); - } - - /** - * @return timestamp at which this notification was generated - */ - public Date getTimestamp() { - return date; - } - - @Override - public String toString() { - return super.toString() + ";date=" + date; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java index a363d5d..a9a5ad1 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/NotificationProcessorExecutor.java @@ -27,9 +27,9 @@ import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.LifeCycle; import org.apache.rya.periodic.notification.api.NodeBin; -import org.apache.rya.periodic.notification.exporter.BindingSetRecord; import org.apache.rya.periodic.notification.notification.TimestampedNotification; import com.google.common.base.Preconditions; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java index baeb611..8b65683 100644 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java @@ -26,9 +26,9 @@ import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.BindingSetRecord; import org.apache.rya.periodic.notification.api.NodeBin; import org.apache.rya.periodic.notification.api.NotificationProcessor; -import org.apache.rya.periodic.notification.exporter.BindingSetRecord; import org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter; import org.apache.rya.periodic.notification.notification.TimestampedNotification; import org.openrdf.query.BindingSet; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java deleted file mode 100644 index ec94bb7..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.registration.kafka; - -import java.util.concurrent.TimeUnit; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.rya.periodic.notification.api.Notification; -import org.apache.rya.periodic.notification.api.PeriodicNotificationClient; -import org.apache.rya.periodic.notification.notification.BasicNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification.Command; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; - -/** - * Implementation of {@link PeriodicNotificaitonClient} used to register new notification - * requests with the PeriodicQueryService. - * - */ -public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient { - - private KafkaProducer<String, CommandNotification> producer; - private String topic; - - public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) { - this.topic = topic; - this.producer = producer; - } - - @Override - public void addNotification(PeriodicNotification notification) { - processNotification(new CommandNotification(Command.ADD, notification)); - - } - - @Override - public void deleteNotification(BasicNotification notification) { - processNotification(new CommandNotification(Command.DELETE, notification)); - } - - @Override - public void deleteNotification(String notificationId) { - processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId))); - } - - @Override - public void addNotification(String id, long period, long delay, TimeUnit unit) { - Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build(); - processNotification(new CommandNotification(Command.ADD, notification)); - } - - - private void processNotification(CommandNotification notification) { - producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification)); - } - - @Override - public void close() { - producer.close(); - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java deleted file mode 100644 index bd29d29..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.lang.reflect.Type; - -import org.apache.rya.periodic.notification.notification.BasicNotification; - -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonPrimitive; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; - -/** - * {@link TypeAdapter} for {@link BasicNotification}s. Used in {@link CommandNotificationTypeAdapter} to - * serialize {@link CommandNotification}s. - * - */ -public class BasicNotificationTypeAdapter implements JsonDeserializer<BasicNotification>, JsonSerializer<BasicNotification> { - - @Override - public JsonElement serialize(BasicNotification arg0, Type arg1, JsonSerializationContext arg2) { - JsonObject result = new JsonObject(); - result.add("id", new JsonPrimitive(arg0.getId())); - return result; - } - - @Override - public BasicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException { - JsonObject json = arg0.getAsJsonObject(); - String id = json.get("id").getAsString(); - return new BasicNotification(id); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java deleted file mode 100644 index 50180ad..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.io.UnsupportedEncodingException; -import java.util.Arrays; -import java.util.Map; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.openrdf.query.BindingSet; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.base.Joiner; -import com.google.common.primitives.Bytes; - -/** - * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming messages - * from Kafka. - * - */ -public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> { - - private static final Logger log = Logger.getLogger(BindingSetSerDe.class); - private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer(); - private static final byte[] DELIM_BYTE = "\u0002".getBytes(); - - private byte[] toBytes(BindingSet bindingSet) { - try { - return getBytes(getVarOrder(bindingSet), bindingSet); - } catch(Exception e) { - log.trace("Unable to serialize BindingSet: " + bindingSet); - return new byte[0]; - } - } - - private BindingSet fromBytes(byte[] bsBytes) { - try{ - int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE); - byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex); - byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length); - VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";")); - return getBindingSet(varOrder, bsBytesNoVarOrder); - } catch(Exception e) { - log.trace("Unable to deserialize BindingSet: " + bsBytes); - return new QueryBindingSet(); - } - } - - private VariableOrder getVarOrder(BindingSet bs) { - return new VariableOrder(bs.getBindingNames()); - } - - private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException { - byte[] bsBytes = serializer.convert(bs, varOrder); - String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders()); - byte[] varOrderBytes = varOrderString.getBytes("UTF-8"); - return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes); - } - - private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException { - return serializer.convert(bsBytes, varOrder); - } - - @Override - public BindingSet deserialize(String topic, byte[] bytes) { - return fromBytes(bytes); - } - - @Override - public void close() { - // Do nothing. Nothing to close. - } - - @Override - public void configure(Map<String, ?> arg0, boolean arg1) { - // Do nothing. Nothing to configure. - } - - @Override - public byte[] serialize(String topic, BindingSet bs) { - return toBytes(bs); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java deleted file mode 100644 index 302e1be..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.io.UnsupportedEncodingException; -import java.util.Map; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.rya.periodic.notification.api.Notification; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -/** - * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s - * to and from Kafka. - * - */ -public class CommandNotificationSerializer implements Serializer<CommandNotification>, Deserializer<CommandNotification> { - - private static Gson gson = new GsonBuilder() - .registerTypeHierarchyAdapter(Notification.class, new CommandNotificationTypeAdapter()).create(); - private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class); - - @Override - public CommandNotification deserialize(String topic, byte[] bytes) { - String json = null; - try { - json = new String(bytes, "UTF-8"); - } catch (UnsupportedEncodingException e) { - LOG.info("Unable to deserialize notification for topic: " + topic); - } - return gson.fromJson(json, CommandNotification.class); - } - - @Override - public byte[] serialize(String topic, CommandNotification command) { - try { - return gson.toJson(command).getBytes("UTF-8"); - } catch (UnsupportedEncodingException e) { - LOG.info("Unable to serialize notification: " + command + "for topic: " + topic); - throw new RuntimeException(e); - } - } - - @Override - public void close() { - // Do nothing. Nothing to close - } - - @Override - public void configure(Map<String, ?> arg0, boolean arg1) { - // Do nothing. Nothing to configure - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java deleted file mode 100644 index a9fb7e1..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.lang.reflect.Type; - -import org.apache.rya.periodic.notification.api.Notification; -import org.apache.rya.periodic.notification.notification.BasicNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification; -import org.apache.rya.periodic.notification.notification.PeriodicNotification; -import org.apache.rya.periodic.notification.notification.CommandNotification.Command; - -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonPrimitive; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; - -/** - * {@link TypeAdapter} used to serialize and deserialize {@link CommandNotification}s. - * This TypeAdapter is used in {@link CommandNotificationSerializer} for producing and - * consuming messages to and from Kafka. - * - */ -public class CommandNotificationTypeAdapter - implements JsonDeserializer<CommandNotification>, JsonSerializer<CommandNotification> { - - @Override - public JsonElement serialize(CommandNotification arg0, Type arg1, JsonSerializationContext arg2) { - JsonObject result = new JsonObject(); - result.add("command", new JsonPrimitive(arg0.getCommand().name())); - Notification notification = arg0.getNotification(); - if (notification instanceof PeriodicNotification) { - result.add("type", new JsonPrimitive(PeriodicNotification.class.getSimpleName())); - PeriodicNotificationTypeAdapter adapter = new PeriodicNotificationTypeAdapter(); - result.add("notification", - adapter.serialize((PeriodicNotification) notification, PeriodicNotification.class, arg2)); - } else if (notification instanceof BasicNotification) { - result.add("type", new JsonPrimitive(BasicNotification.class.getSimpleName())); - BasicNotificationTypeAdapter adapter = new BasicNotificationTypeAdapter(); - result.add("notification", - adapter.serialize((BasicNotification) notification, BasicNotification.class, arg2)); - } else { - throw new IllegalArgumentException("Invalid notification type."); - } - return result; - } - - @Override - public CommandNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) - throws JsonParseException { - - JsonObject json = arg0.getAsJsonObject(); - Command command = Command.valueOf(json.get("command").getAsString()); - String type = json.get("type").getAsString(); - Notification notification = null; - if (type.equals(PeriodicNotification.class.getSimpleName())) { - notification = (new PeriodicNotificationTypeAdapter()).deserialize(json.get("notification"), - PeriodicNotification.class, arg2); - } else if (type.equals(BasicNotification.class.getSimpleName())) { - notification = (new BasicNotificationTypeAdapter()).deserialize(json.get("notification"), - BasicNotification.class, arg2); - } else { - throw new JsonParseException("Cannot deserialize Json"); - } - - return new CommandNotification(command, notification); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java deleted file mode 100644 index fcc0ba2..0000000 --- a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.periodic.notification.serialization; - -import java.lang.reflect.Type; -import java.util.concurrent.TimeUnit; - -import org.apache.rya.periodic.notification.notification.PeriodicNotification; -import org.apache.rya.periodic.notification.notification.PeriodicNotification.Builder; - -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.JsonParseException; -import com.google.gson.JsonPrimitive; -import com.google.gson.JsonSerializationContext; -import com.google.gson.JsonSerializer; - -/** - * {@link TypeAdapter} used to serialize and deserialize {@link PeriodicNotification}s. - * This TypeAdapter is used in {@link CommandNotificationTypeAdapter} which is used in - * {@link CommandNotificationSerializer} for producing and consuming messages to and from - * Kafka. - * - */ -public class PeriodicNotificationTypeAdapter - implements JsonSerializer<PeriodicNotification>, JsonDeserializer<PeriodicNotification> { - - @Override - public PeriodicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) - throws JsonParseException { - - JsonObject json = arg0.getAsJsonObject(); - String id = json.get("id").getAsString(); - long period = json.get("period").getAsLong(); - TimeUnit periodTimeUnit = TimeUnit.valueOf(json.get("timeUnit").getAsString()); - long initialDelay = json.get("initialDelay").getAsLong(); - Builder builder = PeriodicNotification.builder().id(id).period(period) - .initialDelay(initialDelay).timeUnit(periodTimeUnit); - - return builder.build(); - } - - @Override - public JsonElement serialize(PeriodicNotification arg0, Type arg1, JsonSerializationContext arg2) { - - JsonObject result = new JsonObject(); - result.add("id", new JsonPrimitive(arg0.getId())); - result.add("period", new JsonPrimitive(arg0.getPeriod())); - result.add("initialDelay", new JsonPrimitive(arg0.getInitialDelay())); - result.add("timeUnit", new JsonPrimitive(arg0.getTimeUnit().name())); - - return result; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.periodic.service/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/pom.xml b/extras/rya.periodic.service/pom.xml index fce4996..22ee1aa 100644 --- a/extras/rya.periodic.service/pom.xml +++ b/extras/rya.periodic.service/pom.xml @@ -34,6 +34,7 @@ under the License. <modules> <module>periodic.service.notification</module> <module>periodic.service.integration.tests</module> + <module>periodic.service.api</module> </modules> </project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java index 8fd95d3..7d0ab79 100644 --- a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java +++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java @@ -57,7 +57,10 @@ import com.google.common.base.Optional; public class RyaAdminCommands implements CommandMarker { public static final String CREATE_PCJ_CMD = "create-pcj"; + public static final String CREATE_PERIODIC_PCJ_CMD = "create-periodic-pcj"; public static final String DELETE_PCJ_CMD = "delete-pcj"; + public static final String DELETE_PERIODIC_PCJ_CMD = "delete-periodic-pcj"; + public static final String LIST_INCREMENTAL_QUERIES = "list-incremental-queries"; public static final String PRINT_INSTANCE_DETAILS_CMD = "print-instance-details"; public static final String INSTALL_CMD = "install"; public static final String INSTALL_PARAMETERS_CMD = "install-with-parameters"; @@ -129,7 +132,10 @@ public class RyaAdminCommands implements CommandMarker { */ @CliAvailabilityIndicator({ CREATE_PCJ_CMD, - DELETE_PCJ_CMD }) + DELETE_PCJ_CMD, + CREATE_PERIODIC_PCJ_CMD, + DELETE_PERIODIC_PCJ_CMD, + LIST_INCREMENTAL_QUERIES}) public boolean arePCJCommandsAvailable() { // The PCJ commands are only available if the Shell is connected to an instance of Rya // that is new enough to use the RyaDetailsRepository and is configured to maintain PCJs. @@ -341,6 +347,79 @@ public class RyaAdminCommands implements CommandMarker { throw new RuntimeException("The PCJ could not be deleted. Provided reason: " + e.getMessage(), e); } } + + @CliCommand(value = CREATE_PERIODIC_PCJ_CMD, help = "Creates and starts the maintenance of a new Periodic PCJ and registers the associated Periodic Notification with Kafka.") + public String createPeriodicPcj( + @CliOption(key = {"topic"}, mandatory = true, help = "Kafka topic for registering new PeriodicNotifications. This topic is monitored by the Periodic Notification Service.") + String topic, + @CliOption(key = {"brokers"}, mandatory = true, help = "Comma delimited list of host/port pairs to establish the initial connection to the Kafka cluster.") + String brokers) { + // Fetch the command that is connected to the store. + final ShellState shellState = state.getShellState(); + final RyaClient commands = shellState.getConnectedCommands().get(); + final String ryaInstance = shellState.getRyaInstanceName().get(); + + try { + // Prompt the user for the SPARQL. + final Optional<String> sparql = sparqlPrompt.getSparql(); + if (sparql.isPresent()) { + // Execute the command. + final String pcjId = commands.getCreatePeriodicPCJ().createPeriodicPCJ(ryaInstance, sparql.get(), topic, brokers); + // Return a message that indicates the ID of the newly created ID. + return String.format("The Periodic PCJ has been created. Its ID is '%s'.", pcjId); + } else { + return ""; // user aborted the SPARQL prompt. + } + } catch (final InstanceDoesNotExistException e) { + throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstance), e); + } catch (final IOException | RyaClientException e) { + throw new RuntimeException("Could not create the Periodic PCJ. Provided reasons: " + e.getMessage(), e); + } + } + + @CliCommand(value = DELETE_PERIODIC_PCJ_CMD, help = "Deletes and halts maintenance of a Periodic PCJ.") + public String deletePeriodicPcj( + @CliOption(key = {"pcjId"}, mandatory = true, help = "The ID of the PCJ that will be deleted.") + final String pcjId, + @CliOption(key = {"topic"}, mandatory = true, help = "Kafka topic for registering a delete notice to remove a PeriodicNotification from the Periodic Notification Service.") + final String topic, + @CliOption(key = {"brokers"}, mandatory = true, help = "Comma delimited list of host/port pairs to establish the initial connection to the Kafka cluster.") + final String brokers + ) { + // Fetch the command that is connected to the store. + final ShellState shellState = state.getShellState(); + final RyaClient commands = shellState.getConnectedCommands().get(); + final String ryaInstance = shellState.getRyaInstanceName().get(); + + try { + // Execute the command. + commands.getDeletePeriodicPCJ().deletePeriodicPCJ(ryaInstance, pcjId, topic, brokers); + return "The Periodic PCJ has been deleted."; + + } catch (final InstanceDoesNotExistException e) { + throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstance), e); + } catch (final RyaClientException e) { + throw new RuntimeException("The Periodic PCJ could not be deleted. Provided reason: " + e.getMessage(), e); + } + } + + + @CliCommand(value = LIST_INCREMENTAL_QUERIES, help = "Lists relevant information about all SPARQL queries maintained by the Fluo application.") + public String listFluoQueries() { + // Fetch the command that is connected to the store. + final ShellState shellState = state.getShellState(); + final RyaClient commands = shellState.getConnectedCommands().get(); + final String ryaInstance = shellState.getRyaInstanceName().get(); + + try { + return commands.getListIncrementalQueries().listIncrementalQueries(ryaInstance); + } catch (final InstanceDoesNotExistException e) { + throw new RuntimeException(String.format("A Rya instance named '%s' does not exist.", ryaInstance), e); + } catch (RyaClientException e) { + throw new RuntimeException("Could not list incremental queries. Provided reasons: " + e.getMessage(), e); + } + } + @CliCommand(value = ADD_USER_CMD, help = "Adds an authorized user to the Rya instance.") public void addUser( http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java b/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java index cab34e9..f08e02a 100644 --- a/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java +++ b/extras/shell/src/test/java/org/apache/rya/shell/RyaAdminCommandsTest.java @@ -34,6 +34,7 @@ import java.util.TimeZone; import org.apache.rya.api.client.AddUser; import org.apache.rya.api.client.CreatePCJ; import org.apache.rya.api.client.DeletePCJ; +import org.apache.rya.api.client.DeletePeriodicPCJ; import org.apache.rya.api.client.GetInstanceDetails; import org.apache.rya.api.client.Install; import org.apache.rya.api.client.Install.DuplicateInstanceNameException; @@ -45,6 +46,7 @@ import org.apache.rya.api.client.RyaClient; import org.apache.rya.api.client.RyaClientException; import org.apache.rya.api.client.Uninstall; import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePeriodicPCJ; import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; import org.apache.rya.api.instance.RyaDetails; import org.apache.rya.api.instance.RyaDetails.EntityCentricIndexDetails; @@ -151,6 +153,69 @@ public class RyaAdminCommandsTest { final String expected = "The PCJ has been deleted."; assertEquals(expected, message); } + + @Test + public void createPeriodicPCJ() throws InstanceDoesNotExistException, RyaClientException, IOException { + // Mock the object that performs the create operation. + final String instanceName = "unitTest"; + final String sparql = "SELECT * WHERE { ?person <http://isA> ?noun }"; + final String topic = "topic"; + final String brokers = "brokers"; + final String pcjId = "12341234"; + final CreatePeriodicPCJ mockCreatePCJ = mock(CreatePeriodicPCJ.class); + when(mockCreatePCJ.createPeriodicPCJ( eq(instanceName), eq(sparql), eq(topic), eq(brokers) )).thenReturn( pcjId ); + + final RyaClient mockCommands = mock(RyaClient.class); + when(mockCommands.getCreatePeriodicPCJ()).thenReturn( mockCreatePCJ ); + + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands); + state.connectedToInstance(instanceName); + + final SparqlPrompt mockSparqlPrompt = mock(SparqlPrompt.class); + when(mockSparqlPrompt.getSparql()).thenReturn(Optional.of(sparql)); + + // Execute the command. + final RyaAdminCommands commands = new RyaAdminCommands(state, mock(InstallPrompt.class), mockSparqlPrompt, mock(UninstallPrompt.class)); + final String message = commands.createPeriodicPcj(topic, brokers); + + // Verify the values that were provided to the command were passed through to CreatePCJ. + verify(mockCreatePCJ).createPeriodicPCJ(eq(instanceName), eq(sparql), eq(topic), eq(brokers)); + + // Verify a message is returned that explains what was created. + final String expected = "The Periodic PCJ has been created. Its ID is '12341234'."; + assertEquals(expected, message); + } + + @Test + public void deletePeriodicPCJ() throws InstanceDoesNotExistException, RyaClientException { + // Mock the object that performs the delete operation. + final DeletePeriodicPCJ mockDeletePCJ = mock(DeletePeriodicPCJ.class); + + final RyaClient mockCommands = mock(RyaClient.class); + when(mockCommands.getDeletePeriodicPCJ()).thenReturn( mockDeletePCJ ); + + final SharedShellState state = new SharedShellState(); + state.connectedToAccumulo(mock(AccumuloConnectionDetails.class), mockCommands); + final String instanceName = "unitTests"; + state.connectedToInstance(instanceName); + + // Execute the command. + final String pcjId = "123412342"; + final String topic = "topic"; + final String brokers = "brokers"; + + final RyaAdminCommands commands = new RyaAdminCommands(state, mock(InstallPrompt.class), mock(SparqlPrompt.class), mock(UninstallPrompt.class)); + final String message = commands.deletePeriodicPcj(pcjId, topic, brokers); + + // Verify the values that were provided to the command were passed through to the DeletePCJ. + verify(mockDeletePCJ).deletePeriodicPCJ(eq(instanceName), eq(pcjId), eq(topic), eq(brokers)); + + // Verify a message is returned that explains what was deleted. + final String expected = "The Periodic PCJ has been deleted."; + assertEquals(expected, message); + } + @Test public void getInstanceDetails() throws InstanceDoesNotExistException, RyaClientException { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 9516b9c..51822be 100644 --- a/pom.xml +++ b/pom.xml @@ -244,6 +244,26 @@ under the License. <artifactId>rya.pcj.fluo.app</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service.api</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service.notification</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service.integration.tests</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId>
