Repository: incubator-rya
Updated Branches:
  refs/heads/master ab8035a17 -> 2ca854271


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
new file mode 100644
index 0000000..baeb611
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.processor;
+
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import 
org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+import org.apache.rya.periodic.notification.api.NotificationProcessor;
+import org.apache.rya.periodic.notification.exporter.BindingSetRecord;
+import 
org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter;
+import 
org.apache.rya.periodic.notification.notification.TimestampedNotification;
+import org.openrdf.query.BindingSet;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Implementation of {@link NotificationProcessor} that uses the id indicated 
by
+ * the {@link TimestampedNotification} to obtain results from the
+ * {@link PeriodicQueryResultStorage} layer containing the results of the
+ * Periodic Query. The TimestampedNotificationProcessor then parses the results
+ * and adds them to work queues to be processed by the {@link BinPruner} and 
the
+ * {@link KafkaPeriodicBindingSetExporter}.
+ *
+ */
+public class TimestampedNotificationProcessor implements 
NotificationProcessor, Runnable {
+
+    private static final Logger log = 
Logger.getLogger(TimestampedNotificationProcessor.class);
+    private PeriodicQueryResultStorage periodicStorage;
+    private BlockingQueue<TimestampedNotification> notifications; // 
notifications
+                                                                  // to process
+    private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
+    private BlockingQueue<BindingSetRecord> bindingSets; // query results to 
export
+    private AtomicBoolean closed = new AtomicBoolean(false);
+    private int threadNumber;
+    
+
+    public TimestampedNotificationProcessor(PeriodicQueryResultStorage 
periodicStorage,
+            BlockingQueue<TimestampedNotification> notifications, 
BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets,
+            int threadNumber) {
+        this.notifications = Preconditions.checkNotNull(notifications);
+        this.bins = Preconditions.checkNotNull(bins);
+        this.bindingSets = Preconditions.checkNotNull(bindingSets);
+        this.periodicStorage = periodicStorage;
+        this.threadNumber = threadNumber;
+    }
+
+    /**
+     * Processes the TimestampNotifications by scanning the PCJ tables for
+     * entries in the bin corresponding to
+     * {@link TimestampedNotification#getTimestamp()} and adding them to the
+     * export BlockingQueue. The TimestampNotification is then used to form a
+     * {@link NodeBin} that is passed to the BinPruner BlockingQueue so that 
the
+     * bins can be deleted from Fluo and Accumulo.
+     */
+    @Override
+    public void processNotification(TimestampedNotification notification) {
+
+        String id = notification.getId();
+        long ts = notification.getTimestamp().getTime();
+        long period = notification.getPeriod();
+        long bin = getBinFromTimestamp(ts, period);
+        NodeBin nodeBin = new NodeBin(id, bin);
+
+        try (CloseableIterator<BindingSet> iter = 
periodicStorage.listResults(id, Optional.of(bin));) {
+
+            while(iter.hasNext()) {
+                bindingSets.add(new BindingSetRecord(iter.next(), id));
+            }
+            // add NodeBin to BinPruner queue so that bin can be deleted from
+            // Fluo and Accumulo
+            bins.add(nodeBin);
+        } catch (Exception e) {
+            log.debug("Encountered error: " + e.getMessage() + " while 
accessing periodic results for bin: " + bin + " for query: " + id);
+        }
+    }
+
+    /**
+     * Computes left bin end point containing event time ts
+     * 
+     * @param ts - event time
+     * @param start - time that periodic event began
+     * @param period - length of period
+     * @return left bin end point containing event time ts
+     */
+    private long getBinFromTimestamp(long ts, long period) {
+        Preconditions.checkArgument(period > 0);
+        return (ts / period) * period;
+    }
+
+    @Override
+    public void run() {
+        try {
+            while(!closed.get()) {
+                processNotification(notifications.take());
+            }
+        } catch (Exception e) {
+            log.trace("Thread_" + threadNumber + " is unable to process next 
notification.");
+            throw new RuntimeException(e);
+        }
+
+    }
+    
+    public void shutdown() {
+        closed.set(true);
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+  
+
+    public static class Builder {
+
+        private PeriodicQueryResultStorage periodicStorage;
+        private BlockingQueue<TimestampedNotification> notifications; // 
notifications to process
+        private BlockingQueue<NodeBin> bins; // entries to delete from Fluo
+        private BlockingQueue<BindingSetRecord> bindingSets; // query results 
to export
+                                                       
+        private int threadNumber;
+
+        /**
+         * Set notification queue
+         * @param notifications - work queue containing notifications to be 
processed
+         * @return this Builder for chaining method calls
+         */
+        public Builder setNotifications(BlockingQueue<TimestampedNotification> 
notifications) {
+            this.notifications = notifications;
+            return this;
+        }
+
+        /**
+         * Set nodeBin queue
+         * @param bins - work queue containing NodeBins to be pruned
+         * @return this Builder for chaining method calls
+         */
+        public Builder setBins(BlockingQueue<NodeBin> bins) {
+            this.bins = bins;
+            return this;
+        }
+
+        /**
+         * Set BindingSet queue
+         * @param bindingSets - work queue containing BindingSets to be 
exported
+         * @return this Builder for chaining method calls
+         */
+        public Builder setBindingSets(BlockingQueue<BindingSetRecord> 
bindingSets) {
+            this.bindingSets = bindingSets;
+            return this;
+        }
+
+        /**
+         * Sets the number of threads used by this processor
+         * @param threadNumber - number of threads used by this processor
+         * @return - number of threads used by this processor
+         */
+        public Builder setThreadNumber(int threadNumber) {
+            this.threadNumber = threadNumber;
+            return this;
+        }
+        
+        /**
+         * Set the PeriodicStorage layer
+         * @param periodicStorage - periodic storage layer that periodic 
results are read from
+         * @return - this Builder for chaining method calls
+         */
+        public Builder setPeriodicStorage(PeriodicQueryResultStorage 
periodicStorage) {
+            this.periodicStorage = periodicStorage;
+            return this;
+        }
+
+        /**
+         * Builds a TimestampedNotificationProcessor
+         * @return - TimestampedNotificationProcessor built from arguments 
passed to this Builder
+         */
+        public TimestampedNotificationProcessor build() {
+            return new TimestampedNotificationProcessor(periodicStorage, 
notifications, bins, bindingSets, threadNumber);
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
new file mode 100644
index 0000000..4dac64c
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import jline.internal.Preconditions;
+
+/**
+ * Deletes BindingSets from time bins in the indicated PCJ table
+ */
+public class AccumuloBinPruner implements BinPruner {
+
+    private static final Logger log = 
Logger.getLogger(AccumuloBinPruner.class);
+    private PeriodicQueryResultStorage periodicStorage;
+
+    public AccumuloBinPruner(PeriodicQueryResultStorage periodicStorage) {
+        Preconditions.checkNotNull(periodicStorage);
+        this.periodicStorage = periodicStorage;
+    }
+
+    /**
+     * This method deletes all BindingSets in the indicated bin from the PCJ
+     * table indicated by the id. It is assumed that all BindingSet entries for
+     * the corresponding bin are written to the PCJ table so that the bin Id
+     * occurs first.
+     * 
+     * @param id
+     *            - pcj table id
+     * @param bin
+     *            - temporal bin the BindingSets are contained in
+     */
+    @Override
+    public void pruneBindingSetBin(NodeBin nodeBin) {
+        Preconditions.checkNotNull(nodeBin);
+        String id = nodeBin.getNodeId();
+        long bin = nodeBin.getBin();
+        try {
+            periodicStorage.deletePeriodicQueryResults(id, bin);
+        } catch (PeriodicQueryStorageException e) {
+            log.trace("Unable to delete results from Peroidic Table: " + id + 
" for bin: " + bin);
+            throw new RuntimeException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
new file mode 100644
index 0000000..bee9c02
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Transaction;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.Column;
+import org.apache.fluo.api.data.Span;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO;
+import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import com.google.common.base.Optional;
+
+/**
+ * Deletes {@link BindingSet}s from the indicated Fluo table.
+ */
+public class FluoBinPruner implements BinPruner {
+
+    private static final Logger log = Logger.getLogger(FluoBinPruner.class);
+    private FluoClient client;
+
+    public FluoBinPruner(FluoClient client) {
+        this.client = client;
+    }
+
+    /**
+     * This method deletes BindingSets in the specified bin from the BindingSet
+     * Column of the indicated Fluo nodeId
+     * 
+     * @param id
+     *            - Fluo nodeId
+     * @param bin
+     *            - bin id
+     */
+    @Override
+    public void pruneBindingSetBin(NodeBin nodeBin) {
+        String id = nodeBin.getNodeId();
+        long bin = nodeBin.getBin();
+        try (Transaction tx = client.newTransaction()) {
+            Optional<NodeType> type = NodeType.fromNodeId(id);
+            if (!type.isPresent()) {
+                log.trace("Unable to determine NodeType from id: " + id);
+                throw new RuntimeException();
+            }
+            Column batchInfoColumn = type.get().getResultColumn();
+            String batchInfoSpanPrefix = id + 
IncrementalUpdateConstants.NODEID_BS_DELIM + bin;
+            SpanBatchDeleteInformation batchInfo = 
SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn)
+                    
.setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build();
+            BatchInformationDAO.addBatch(tx, id, batchInfo);
+            tx.commit();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
new file mode 100644
index 0000000..97e3f22
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.SnapshotBase;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
+import org.apache.rya.periodic.notification.api.BinPruner;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import jline.internal.Preconditions;
+
+/**
+ * Implementation of {@link BinPruner} that deletes old, already processed
+ * Periodic Query results from Fluo and the PCJ table to which the Fluo results
+ * are exported.
+ *
+ */
+public class PeriodicQueryPruner implements BinPruner, Runnable {
+
+    private static final Logger log = 
Logger.getLogger(PeriodicQueryPruner.class);
+    private FluoClient client;
+    private AccumuloBinPruner accPruner;
+    private FluoBinPruner fluoPruner;
+    private BlockingQueue<NodeBin> bins;
+    private AtomicBoolean closed = new AtomicBoolean(false);
+    private int threadNumber;
+
+    public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner 
accPruner, FluoClient client, BlockingQueue<NodeBin> bins, int threadNumber) {
+        this.fluoPruner = Preconditions.checkNotNull(fluoPruner);
+        this.accPruner = Preconditions.checkNotNull(accPruner);
+        this.client = Preconditions.checkNotNull(client);
+        this.bins = Preconditions.checkNotNull(bins);
+        this.threadNumber = threadNumber;
+    }
+    
+    @Override
+    public void run() {
+        try {
+            while (!closed.get()) {
+                pruneBindingSetBin(bins.take());
+            }
+        } catch (InterruptedException e) {
+            log.trace("Thread " + threadNumber + " is unable to prune the next 
message.");
+            throw new RuntimeException(e);
+        }
+    }
+    
+    /**
+     * Prunes BindingSet bins from the Rya Fluo Application in addition to the 
BindingSet
+     * bins created in the PCJ tables associated with the give query id.
+     * @param id - QueryResult Id for the Rya Fluo application 
+     * @param bin - bin id for bins to be deleted
+     */
+    @Override
+    public void pruneBindingSetBin(NodeBin nodeBin) {
+        String id = nodeBin.getNodeId();
+        long bin = nodeBin.getBin();
+        try(Snapshot sx = client.newSnapshot()) {
+            String queryId = sx.get(Bytes.of(id), 
FluoQueryColumns.PCJ_ID_QUERY_ID).toString();
+            Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId);
+            accPruner.pruneBindingSetBin(new NodeBin(id, bin));
+            for(String fluoId: fluoIds) {
+                fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin));
+            }
+        } catch (Exception e) {
+            log.trace("Could not successfully initialize 
PeriodicQueryBinPruner.");
+        }
+    }
+    
+    
+    public void shutdown() {
+        closed.set(true);
+    }
+
+    private Set<String> getNodeIdsFromResultId(SnapshotBase sx, String id) {
+        Set<String> ids = new HashSet<>();
+        PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, id, ids);
+        return ids;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
new file mode 100644
index 0000000..1c11f96
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.pruner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.FluoClient;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.NodeBin;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Executor service that runs {@link PeriodicQueryPruner}s with added 
functionality
+ * for starting, stopping, and determining if the query pruners are running.
+ */
+public class PeriodicQueryPrunerExecutor implements LifeCycle {
+
+    private static final Logger log = 
Logger.getLogger(PeriodicQueryPrunerExecutor.class);
+    private FluoClient client;
+    private int numThreads;
+    private ExecutorService executor;
+    private BlockingQueue<NodeBin> bins;
+    private PeriodicQueryResultStorage periodicStorage;
+    private List<PeriodicQueryPruner> pruners;
+    private boolean running = false;
+
+    public PeriodicQueryPrunerExecutor(PeriodicQueryResultStorage 
periodicStorage, FluoClient client, int numThreads,
+            BlockingQueue<NodeBin> bins) {
+        Preconditions.checkArgument(numThreads > 0);
+        this.periodicStorage = periodicStorage;
+        this.numThreads = numThreads;
+        executor = Executors.newFixedThreadPool(numThreads);
+        this.bins = bins;
+        this.client = client;
+        this.pruners = new ArrayList<>();
+    }
+
+    @Override
+    public void start() {
+        if (!running) {
+            AccumuloBinPruner accPruner = new 
AccumuloBinPruner(periodicStorage);
+            FluoBinPruner fluoPruner = new FluoBinPruner(client);
+
+            for (int threadNumber = 0; threadNumber < numThreads; 
threadNumber++) {
+                PeriodicQueryPruner pruner = new 
PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber);
+                pruners.add(pruner);
+                executor.submit(pruner);
+            }
+            running = true;
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (pruners != null && pruners.size() > 0) {
+            pruners.forEach(x -> x.shutdown());
+        }
+        if(client != null) {
+            client.close();
+        }
+        if (executor != null) {
+            executor.shutdown();
+            running = false;
+        }
+        try {
+            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                log.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            log.info("Interrupted during shutdown, exiting uncleanly");
+        }
+    }
+
+    @Override
+    public boolean currentlyRunning() {
+        return running;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
new file mode 100644
index 0000000..8e8b1a2
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.recovery;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.fluo.api.client.Snapshot;
+import org.apache.fluo.api.client.scanner.ColumnScanner;
+import org.apache.fluo.api.client.scanner.RowScanner;
+import org.apache.fluo.api.data.Bytes;
+import org.apache.fluo.api.data.ColumnValue;
+import org.apache.fluo.api.data.Span;
+import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
+import org.apache.rya.indexing.pcj.fluo.app.NodeType;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns;
+import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO;
+import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import 
org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import 
org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ * This class is used by the {@link PeriodicNotificationCoordinatorExecutor}
+ * to add all existing {@link PeriodicNotification}s stored in Fluo when it is
+ * initialized.  This enables the the {@link PeriodicServiceApplication} to be 
+ * recovered from failure by restoring it original state.
+ *
+ */
+public class PeriodicNotificationProvider {
+
+    private FluoQueryMetadataDAO dao;
+    
+    public PeriodicNotificationProvider() {
+        this.dao = new FluoQueryMetadataDAO();
+    }
+    
+    /**
+     * Retrieve all of the information about Periodic Query results already 
registered
+     * with Fluo.  This is returned in the form of {@link 
CommandNotification}s that
+     * can be registered with the {@link NotificationCoordinatorExecutor}.
+     * @param sx - snapshot for reading results from Fluo
+     * @return - collection of CommandNotifications that indicate Periodic 
Query information registered with system
+     */
+    public Collection<CommandNotification> getNotifications(Snapshot sx) {
+        Set<PeriodicQueryMetadata> periodicMetadata = new HashSet<>();
+        RowScanner scanner = 
sx.scanner().fetch(FluoQueryColumns.PERIODIC_QUERY_NODE_ID)
+                
.over(Span.prefix(IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX)).byRow().build();
+        Iterator<ColumnScanner> colScannerIter = scanner.iterator();
+        while (colScannerIter.hasNext()) {
+            ColumnScanner colScanner = colScannerIter.next();
+            Iterator<ColumnValue> values = colScanner.iterator();
+            while (values.hasNext()) {
+                PeriodicQueryMetadata metadata = 
dao.readPeriodicQueryMetadata(sx, values.next().getsValue());
+                periodicMetadata.add(metadata);
+            }
+        }
+        return getCommandNotifications(sx, periodicMetadata);
+    }
+    
+    /**
+     * Registers all of Periodic Query information already contained within 
Fluo to the 
+     * {@link NotificationCoordinatorExecutor}.
+     * @param coordinator - coordinator that periodic info will be registered 
with
+     * @param sx - snapshot for reading results from Fluo
+     */
+    public void processRegisteredNotifications(NotificationCoordinatorExecutor 
coordinator, Snapshot sx) {
+        coordinator.start();
+        Collection<CommandNotification> notifications = getNotifications(sx);
+        for(CommandNotification notification: notifications) {
+            coordinator.processNextCommandNotification(notification);
+        }
+    }
+    
+    private Collection<CommandNotification> getCommandNotifications(Snapshot 
sx, Collection<PeriodicQueryMetadata> metadata) {
+        Set<CommandNotification> notifications = new HashSet<>();
+        int i = 1;
+        for(PeriodicQueryMetadata meta:metadata) {
+            //offset initial wait to avoid overloading system
+            PeriodicNotification periodic = new 
PeriodicNotification(getQueryId(meta.getNodeId(), sx), 
meta.getPeriod(),TimeUnit.MILLISECONDS,i*5000);
+            notifications.add(new CommandNotification(Command.ADD, periodic));
+            i++;
+        }
+        return notifications;
+    }
+    
+    private String getQueryId(String periodicNodeId, Snapshot sx) {
+        return getQueryIdFromPeriodicId(sx, periodicNodeId);
+    }
+    
+    private String getQueryIdFromPeriodicId(Snapshot sx, String nodeId) {
+        NodeType nodeType = NodeType.fromNodeId(nodeId).orNull();
+        String id = null;
+        switch (nodeType) {
+        case FILTER:
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.FILTER_PARENT_NODE_ID).toString());
+            break;
+        case PERIODIC_QUERY:
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID).toString());
+            break;
+        case QUERY:
+            id = sx.get(Bytes.of(nodeId), 
FluoQueryColumns.RYA_PCJ_ID).toString();
+            break;
+        case AGGREGATION: 
+            id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), 
FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString());
+            break;
+        case CONSTRUCT:
+            id = sx.get(Bytes.of(nodeId), 
FluoQueryColumns.CONSTRUCT_NODE_ID).toString();
+            id = id.split(IncrementalUpdateConstants.CONSTRUCT_PREFIX)[1];
+            break;
+        default:
+            throw new RuntimeException("Invalid NodeType.");
+        }
+        return id;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
new file mode 100644
index 0000000..f5cd13a
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.registration.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.rya.periodic.notification.api.LifeCycle;
+import org.apache.rya.periodic.notification.api.Notification;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Consumer group to pull all requests for adding and deleting {@link 
Notification}s
+ * from Kafka.  This Object executes {@link PeriodicNotificationConsumer}s 
that retrieve
+ * the {@link CommandNotification}s and register them with the {@link 
NotificationCoordinatorExecutor}.
+ *
+ */
+public class KafkaNotificationProvider implements LifeCycle {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaNotificationProvider.class);
+    private String topic;
+    private ExecutorService executor;
+    private NotificationCoordinatorExecutor coord;
+    private Properties props;
+    private int numThreads;
+    private boolean running = false;
+    Deserializer<String> keyDe;
+    Deserializer<CommandNotification> valDe;
+    List<PeriodicNotificationConsumer> consumers;
+
+    /**
+     * Create KafkaNotificationProvider for reading new notification requests 
form Kafka
+     * @param topic - notification topic    
+     * @param keyDe - Kafka message key deserializer
+     * @param valDe - Kafka message value deserializer
+     * @param props - properties used to creates a {@link KafkaConsumer}
+     * @param coord - {@link NotificationCoordinatorExecutor} for managing and 
generating notifications
+     * @param numThreads - number of threads used by this notification provider
+     */
+    public KafkaNotificationProvider(String topic, Deserializer<String> keyDe, 
Deserializer<CommandNotification> valDe, Properties props,
+            NotificationCoordinatorExecutor coord, int numThreads) {
+        this.coord = coord;
+        this.numThreads = numThreads;
+        this.topic = topic;
+        this.props = props;
+        this.consumers = new ArrayList<>();
+        this.keyDe = keyDe;
+        this.valDe = valDe;
+    }
+
+    @Override
+    public void stop() {
+        if (consumers != null && consumers.size() > 0) {
+            for (PeriodicNotificationConsumer consumer : consumers) {
+                consumer.shutdown();
+            }
+        }
+        if (executor != null) {
+            executor.shutdown();
+        }
+        running = false;
+        try {
+            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
+                LOG.info("Timed out waiting for consumer threads to shut down, 
exiting uncleanly");
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            LOG.info("Interrupted during shutdown, exiting uncleanly");
+        }
+    }
+
+    public void start() {
+        if (!running) {
+            if (!coord.currentlyRunning()) {
+                coord.start();
+            }
+            // now launch all the threads
+            executor = Executors.newFixedThreadPool(numThreads);
+
+            // now create consumers to consume the messages
+            int threadNumber = 0;
+            for (int i = 0; i < numThreads; i++) {
+                LOG.info("Creating consumer:" + threadNumber);
+                KafkaConsumer<String, CommandNotification> consumer = new 
KafkaConsumer<String, CommandNotification>(props, keyDe, valDe);
+                PeriodicNotificationConsumer periodicConsumer = new 
PeriodicNotificationConsumer(topic, consumer, threadNumber, coord);
+                consumers.add(periodicConsumer);
+                executor.submit(periodicConsumer);
+                threadNumber++;
+            }
+            running = true;
+        }
+    }
+
+    @Override
+    public boolean currentlyRunning() {
+        return running;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java
new file mode 100644
index 0000000..ec94bb7
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.registration.kafka;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.api.PeriodicNotificationClient;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import 
org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+
+/**
+ *  Implementation of {@link PeriodicNotificaitonClient} used to register new 
notification
+ *  requests with the PeriodicQueryService. 
+ *
+ */
+public class KafkaNotificationRegistrationClient implements 
PeriodicNotificationClient {
+
+    private KafkaProducer<String, CommandNotification> producer;
+    private String topic;
+    
+    public KafkaNotificationRegistrationClient(String topic, 
KafkaProducer<String, CommandNotification> producer) {
+        this.topic = topic;
+        this.producer = producer;
+    }
+    
+    @Override
+    public void addNotification(PeriodicNotification notification) {
+        processNotification(new CommandNotification(Command.ADD, 
notification));
+
+    }
+
+    @Override
+    public void deleteNotification(BasicNotification notification) {
+        processNotification(new CommandNotification(Command.DELETE, 
notification));
+    }
+
+    @Override
+    public void deleteNotification(String notificationId) {
+        processNotification(new CommandNotification(Command.DELETE, new 
BasicNotification(notificationId)));
+    }
+
+    @Override
+    public void addNotification(String id, long period, long delay, TimeUnit 
unit) {
+        Notification notification = 
PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build();
+        processNotification(new CommandNotification(Command.ADD, 
notification));
+    }
+    
+   
+    private void processNotification(CommandNotification notification) {
+        producer.send(new ProducerRecord<String, CommandNotification>(topic, 
notification.getId(), notification));
+    }
+    
+    @Override
+    public void close() {
+        producer.close();
+    }
+    
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
new file mode 100644
index 0000000..6785ce8
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.registration.kafka;
+
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.log4j.Logger;
+import 
org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+
+/**
+ * Consumer for the {@link KafkaNotificationProvider}.  This consumer pull 
messages
+ * from Kafka and registers them with the {@link 
NotificationCoordinatorExecutor}.
+ *
+ */
+public class PeriodicNotificationConsumer implements Runnable {
+    private KafkaConsumer<String, CommandNotification> consumer;
+    private int m_threadNumber;
+    private String topic;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private NotificationCoordinatorExecutor coord;
+    private static final Logger LOG = 
Logger.getLogger(PeriodicNotificationConsumer.class);
+
+    /**
+     * Creates a new PeriodicNotificationConsumer for consuming new 
notification requests from
+     * Kafka.
+     * @param topic - new notification topic
+     * @param consumer - consumer for pulling new requests from Kafka
+     * @param a_threadNumber - number of consumer threads to be used
+     * @param coord - notification coordinator for managing and generating 
notifications
+     */
+    public PeriodicNotificationConsumer(String topic, KafkaConsumer<String, 
CommandNotification> consumer, int a_threadNumber,
+            NotificationCoordinatorExecutor coord) {
+        this.topic = topic;
+        m_threadNumber = a_threadNumber;
+        this.consumer = consumer;
+        this.coord = coord;
+    }
+
+    public void run() {
+        
+        try {
+            LOG.info("Creating kafka stream for consumer:" + m_threadNumber);
+            consumer.subscribe(Arrays.asList(topic));
+            while (!closed.get()) {
+                ConsumerRecords<String, CommandNotification> records = 
consumer.poll(10000);
+                // Handle new records
+                for(ConsumerRecord<String, CommandNotification> record: 
records) {
+                    CommandNotification notification = record.value();
+                    LOG.info("Thread " + m_threadNumber + " is adding 
notification " + notification + " to queue.");
+                    LOG.info("Message: " + notification);
+                    coord.processNextCommandNotification(notification);
+                }
+            }
+        } catch (WakeupException e) {
+            // Ignore exception if closing
+            if (!closed.get()) throw e;
+        } finally {
+            consumer.close();
+        }
+    }
+    
+    public void shutdown() {
+        closed.set(true);
+        consumer.wakeup();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
new file mode 100644
index 0000000..bd29d29
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} for {@link BasicNotification}s.  Used in {@link 
CommandNotificationTypeAdapter} to
+ * serialize {@link CommandNotification}s.  
+ *
+ */
+public class BasicNotificationTypeAdapter implements 
JsonDeserializer<BasicNotification>, JsonSerializer<BasicNotification> {
+
+    @Override
+    public JsonElement serialize(BasicNotification arg0, Type arg1, 
JsonSerializationContext arg2) {
+        JsonObject result = new JsonObject();
+        result.add("id", new JsonPrimitive(arg0.getId()));
+        return result;
+    }
+
+    @Override
+    public BasicNotification deserialize(JsonElement arg0, Type arg1, 
JsonDeserializationContext arg2) throws JsonParseException {
+        JsonObject json = arg0.getAsJsonObject();
+        String id = json.get("id").getAsString();
+        return new BasicNotification(id);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
new file mode 100644
index 0000000..50180ad
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer;
+import 
org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException;
+import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+
+import com.google.common.base.Joiner;
+import com.google.common.primitives.Bytes;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and 
consuming messages
+ * from Kafka.
+ *
+ */
+public class BindingSetSerDe implements Serializer<BindingSet>, 
Deserializer<BindingSet> {
+
+    private static final Logger log = Logger.getLogger(BindingSetSerDe.class);
+    private static final AccumuloPcjSerializer serializer =  new 
AccumuloPcjSerializer();
+    private static final byte[] DELIM_BYTE = "\u0002".getBytes();
+    
+    private byte[] toBytes(BindingSet bindingSet) {
+        try {
+            return getBytes(getVarOrder(bindingSet), bindingSet);
+        } catch(Exception e) {
+            log.trace("Unable to serialize BindingSet: " + bindingSet);
+            return new byte[0];
+        }
+    }
+
+    private BindingSet fromBytes(byte[] bsBytes) {
+        try{
+        int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE);
+        byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex);
+        byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, 
bsBytes.length);
+        VariableOrder varOrder = new VariableOrder(new 
String(varOrderBytes,"UTF-8").split(";"));
+        return getBindingSet(varOrder, bsBytesNoVarOrder);
+        } catch(Exception e) {
+            log.trace("Unable to deserialize BindingSet: " + bsBytes);
+            return new QueryBindingSet();
+        }
+    }
+    
+    private VariableOrder getVarOrder(BindingSet bs) {
+        return new VariableOrder(bs.getBindingNames());
+    }
+    
+    private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws 
UnsupportedEncodingException, BindingSetConversionException {
+        byte[] bsBytes = serializer.convert(bs, varOrder);
+        String varOrderString = 
Joiner.on(";").join(varOrder.getVariableOrders());
+        byte[] varOrderBytes = varOrderString.getBytes("UTF-8");
+        return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes);
+    }
+    
+    private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) 
throws BindingSetConversionException {
+        return serializer.convert(bsBytes, varOrder);
+    }
+
+    @Override
+    public BindingSet deserialize(String topic, byte[] bytes) {
+        return fromBytes(bytes);
+    }
+
+    @Override
+    public void close() {
+        // Do nothing. Nothing to close.
+    }
+
+    @Override
+    public void configure(Map<String, ?> arg0, boolean arg1) {
+        // Do nothing.  Nothing to configure.
+    }
+
+    @Override
+    public byte[] serialize(String topic, BindingSet bs) {
+        return toBytes(bs);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
new file mode 100644
index 0000000..302e1be
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+/**
+ * Kafka {@link Serializer} and {@link Deserializer} for producing and 
consuming {@link CommandNotification}s
+ * to and from Kafka.
+ *
+ */
+public class CommandNotificationSerializer implements 
Serializer<CommandNotification>, Deserializer<CommandNotification> {
+
+    private static Gson gson = new GsonBuilder()
+            .registerTypeHierarchyAdapter(Notification.class, new 
CommandNotificationTypeAdapter()).create();
+    private static final Logger LOG = 
LoggerFactory.getLogger(CommandNotificationSerializer.class);
+
+    @Override
+    public CommandNotification deserialize(String topic, byte[] bytes) {
+        String json = null;
+        try {
+            json = new String(bytes, "UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            LOG.info("Unable to deserialize notification for topic: " + topic);
+        }
+        return gson.fromJson(json, CommandNotification.class);
+    }
+
+    @Override
+    public byte[] serialize(String topic, CommandNotification command) {
+        try {
+            return gson.toJson(command).getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            LOG.info("Unable to serialize notification: " + command  + "for 
topic: " + topic);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void close() {
+        // Do nothing. Nothing to close
+    }
+    
+    @Override
+    public void configure(Map<String, ?> arg0, boolean arg1) {
+        // Do nothing. Nothing to configure
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
new file mode 100644
index 0000000..a9fb7e1
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+
+import org.apache.rya.periodic.notification.api.Notification;
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import 
org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link 
CommandNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationSerializer} for 
producing and 
+ * consuming messages to and from Kafka.
+ *
+ */
+public class CommandNotificationTypeAdapter
+        implements JsonDeserializer<CommandNotification>, 
JsonSerializer<CommandNotification> {
+
+    @Override
+    public JsonElement serialize(CommandNotification arg0, Type arg1, 
JsonSerializationContext arg2) {
+        JsonObject result = new JsonObject();
+        result.add("command", new JsonPrimitive(arg0.getCommand().name()));
+        Notification notification = arg0.getNotification();
+        if (notification instanceof PeriodicNotification) {
+            result.add("type", new 
JsonPrimitive(PeriodicNotification.class.getSimpleName()));
+            PeriodicNotificationTypeAdapter adapter = new 
PeriodicNotificationTypeAdapter();
+            result.add("notification",
+                    adapter.serialize((PeriodicNotification) notification, 
PeriodicNotification.class, arg2));
+        } else if (notification instanceof BasicNotification) {
+            result.add("type", new 
JsonPrimitive(BasicNotification.class.getSimpleName()));
+            BasicNotificationTypeAdapter adapter = new 
BasicNotificationTypeAdapter();
+            result.add("notification",
+                    adapter.serialize((BasicNotification) notification, 
BasicNotification.class, arg2));
+        } else {
+            throw new IllegalArgumentException("Invalid notification type.");
+        }
+        return result;
+    }
+
+    @Override
+    public CommandNotification deserialize(JsonElement arg0, Type arg1, 
JsonDeserializationContext arg2)
+            throws JsonParseException {
+
+        JsonObject json = arg0.getAsJsonObject();
+        Command command = Command.valueOf(json.get("command").getAsString());
+        String type = json.get("type").getAsString();
+        Notification notification = null;
+        if (type.equals(PeriodicNotification.class.getSimpleName())) {
+            notification = (new 
PeriodicNotificationTypeAdapter()).deserialize(json.get("notification"),
+                    PeriodicNotification.class, arg2);
+        } else if (type.equals(BasicNotification.class.getSimpleName())) {
+            notification = (new 
BasicNotificationTypeAdapter()).deserialize(json.get("notification"),
+                    BasicNotification.class, arg2);
+        } else {
+            throw new JsonParseException("Cannot deserialize Json");
+        }
+
+        return new CommandNotification(command, notification);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
new file mode 100644
index 0000000..fcc0ba2
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import 
org.apache.rya.periodic.notification.notification.PeriodicNotification.Builder;
+
+import com.google.gson.JsonDeserializationContext;
+import com.google.gson.JsonDeserializer;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParseException;
+import com.google.gson.JsonPrimitive;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
+
+/**
+ * {@link TypeAdapter} used to serialize and deserialize {@link 
PeriodicNotification}s.
+ * This TypeAdapter is used in {@link CommandNotificationTypeAdapter} which is 
used in
+ * {@link CommandNotificationSerializer} for producing and consuming messages 
to and from
+ * Kafka.
+ *
+ */
+public class PeriodicNotificationTypeAdapter
+        implements JsonSerializer<PeriodicNotification>, 
JsonDeserializer<PeriodicNotification> {
+
+    @Override
+    public PeriodicNotification deserialize(JsonElement arg0, Type arg1, 
JsonDeserializationContext arg2)
+            throws JsonParseException {
+
+        JsonObject json = arg0.getAsJsonObject();
+        String id = json.get("id").getAsString();
+        long period = json.get("period").getAsLong();
+        TimeUnit periodTimeUnit = 
TimeUnit.valueOf(json.get("timeUnit").getAsString());
+        long initialDelay = json.get("initialDelay").getAsLong();
+        Builder builder = PeriodicNotification.builder().id(id).period(period)
+                .initialDelay(initialDelay).timeUnit(periodTimeUnit);
+
+        return builder.build();
+    }
+
+    @Override
+    public JsonElement serialize(PeriodicNotification arg0, Type arg1, 
JsonSerializationContext arg2) {
+
+        JsonObject result = new JsonObject();
+        result.add("id", new JsonPrimitive(arg0.getId()));
+        result.add("period", new JsonPrimitive(arg0.getPeriod()));
+        result.add("initialDelay", new JsonPrimitive(arg0.getInitialDelay()));
+        result.add("timeUnit", new JsonPrimitive(arg0.getTimeUnit().name()));
+
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.periodic.service/periodic.service.notification/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java
 
b/extras/rya.periodic.service/periodic.service.notification/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java
new file mode 100644
index 0000000..4aad1c6
--- /dev/null
+++ 
b/extras/rya.periodic.service/periodic.service.notification/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.periodic.notification.serialization;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.rya.periodic.notification.notification.BasicNotification;
+import org.apache.rya.periodic.notification.notification.CommandNotification;
+import 
org.apache.rya.periodic.notification.notification.CommandNotification.Command;
+import org.apache.rya.periodic.notification.notification.PeriodicNotification;
+import 
org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CommandNotificationSerializerTest {
+
+    private CommandNotificationSerializer serializer = new 
CommandNotificationSerializer();
+    private static final String topic = "topic";
+
+    @Test
+    public void basicSerializationTest() {
+        PeriodicNotification notification = 
PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(24)
+                .timeUnit(TimeUnit.DAYS).initialDelay(1).build();
+        CommandNotification command = new CommandNotification(Command.ADD, 
notification);
+        Assert.assertEquals(command, 
serializer.deserialize(topic,serializer.serialize(topic, command)));
+
+        PeriodicNotification notification1 = 
PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(32)
+                .timeUnit(TimeUnit.SECONDS).initialDelay(15).build();
+        CommandNotification command1 = new CommandNotification(Command.ADD, 
notification1);
+        Assert.assertEquals(command1, 
serializer.deserialize(topic,serializer.serialize(topic,command1)));
+
+        PeriodicNotification notification2 = 
PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(32)
+                .timeUnit(TimeUnit.SECONDS).initialDelay(15).build();
+        CommandNotification command2 = new CommandNotification(Command.ADD, 
notification2);
+        Assert.assertEquals(command2, 
serializer.deserialize(topic,serializer.serialize(topic,command2)));
+
+        BasicNotification notification3 = new 
BasicNotification(UUID.randomUUID().toString());
+        CommandNotification command3 = new CommandNotification(Command.ADD, 
notification3);
+        Assert.assertEquals(command3, 
serializer.deserialize(topic,serializer.serialize(topic,command3)));
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.periodic.service/pom.xml 
b/extras/rya.periodic.service/pom.xml
new file mode 100644
index 0000000..fce4996
--- /dev/null
+++ b/extras/rya.periodic.service/pom.xml
@@ -0,0 +1,39 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+  <modelVersion>4.0.0</modelVersion>
+  <artifactId>rya.periodic.service</artifactId>
+  
+   <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.extras</artifactId>
+        <version>3.2.11-incubating-SNAPSHOT</version>
+    </parent>
+  
+  <name>Apache Rya Periodic Service</name>
+  <description>Parent class for Rya Periodic Service</description>
+  
+  <packaging>pom</packaging>
+
+    <modules>
+        <module>periodic.service.notification</module>
+        <module>periodic.service.integration.tests</module>
+    </modules>
+  
+</project>
\ No newline at end of file

Reply via email to