Repository: incubator-rya Updated Branches: refs/heads/master ab8035a17 -> 2ca854271
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java new file mode 100644 index 0000000..baeb611 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/processor/TimestampedNotificationProcessor.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.processor; + +import java.util.Optional; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; +import org.apache.rya.periodic.notification.api.NotificationProcessor; +import org.apache.rya.periodic.notification.exporter.BindingSetRecord; +import org.apache.rya.periodic.notification.exporter.KafkaPeriodicBindingSetExporter; +import org.apache.rya.periodic.notification.notification.TimestampedNotification; +import org.openrdf.query.BindingSet; + +import com.google.common.base.Preconditions; + +/** + * Implementation of {@link NotificationProcessor} that uses the id indicated by + * the {@link TimestampedNotification} to obtain results from the + * {@link PeriodicQueryResultStorage} layer containing the results of the + * Periodic Query. The TimestampedNotificationProcessor then parses the results + * and adds them to work queues to be processed by the {@link BinPruner} and the + * {@link KafkaPeriodicBindingSetExporter}. + * + */ +public class TimestampedNotificationProcessor implements NotificationProcessor, Runnable { + + private static final Logger log = Logger.getLogger(TimestampedNotificationProcessor.class); + private PeriodicQueryResultStorage periodicStorage; + private BlockingQueue<TimestampedNotification> notifications; // notifications + // to process + private BlockingQueue<NodeBin> bins; // entries to delete from Fluo + private BlockingQueue<BindingSetRecord> bindingSets; // query results to export + private AtomicBoolean closed = new AtomicBoolean(false); + private int threadNumber; + + + public TimestampedNotificationProcessor(PeriodicQueryResultStorage periodicStorage, + BlockingQueue<TimestampedNotification> notifications, BlockingQueue<NodeBin> bins, BlockingQueue<BindingSetRecord> bindingSets, + int threadNumber) { + this.notifications = Preconditions.checkNotNull(notifications); + this.bins = Preconditions.checkNotNull(bins); + this.bindingSets = Preconditions.checkNotNull(bindingSets); + this.periodicStorage = periodicStorage; + this.threadNumber = threadNumber; + } + + /** + * Processes the TimestampNotifications by scanning the PCJ tables for + * entries in the bin corresponding to + * {@link TimestampedNotification#getTimestamp()} and adding them to the + * export BlockingQueue. The TimestampNotification is then used to form a + * {@link NodeBin} that is passed to the BinPruner BlockingQueue so that the + * bins can be deleted from Fluo and Accumulo. + */ + @Override + public void processNotification(TimestampedNotification notification) { + + String id = notification.getId(); + long ts = notification.getTimestamp().getTime(); + long period = notification.getPeriod(); + long bin = getBinFromTimestamp(ts, period); + NodeBin nodeBin = new NodeBin(id, bin); + + try (CloseableIterator<BindingSet> iter = periodicStorage.listResults(id, Optional.of(bin));) { + + while(iter.hasNext()) { + bindingSets.add(new BindingSetRecord(iter.next(), id)); + } + // add NodeBin to BinPruner queue so that bin can be deleted from + // Fluo and Accumulo + bins.add(nodeBin); + } catch (Exception e) { + log.debug("Encountered error: " + e.getMessage() + " while accessing periodic results for bin: " + bin + " for query: " + id); + } + } + + /** + * Computes left bin end point containing event time ts + * + * @param ts - event time + * @param start - time that periodic event began + * @param period - length of period + * @return left bin end point containing event time ts + */ + private long getBinFromTimestamp(long ts, long period) { + Preconditions.checkArgument(period > 0); + return (ts / period) * period; + } + + @Override + public void run() { + try { + while(!closed.get()) { + processNotification(notifications.take()); + } + } catch (Exception e) { + log.trace("Thread_" + threadNumber + " is unable to process next notification."); + throw new RuntimeException(e); + } + + } + + public void shutdown() { + closed.set(true); + } + + public static Builder builder() { + return new Builder(); + } + + + + public static class Builder { + + private PeriodicQueryResultStorage periodicStorage; + private BlockingQueue<TimestampedNotification> notifications; // notifications to process + private BlockingQueue<NodeBin> bins; // entries to delete from Fluo + private BlockingQueue<BindingSetRecord> bindingSets; // query results to export + + private int threadNumber; + + /** + * Set notification queue + * @param notifications - work queue containing notifications to be processed + * @return this Builder for chaining method calls + */ + public Builder setNotifications(BlockingQueue<TimestampedNotification> notifications) { + this.notifications = notifications; + return this; + } + + /** + * Set nodeBin queue + * @param bins - work queue containing NodeBins to be pruned + * @return this Builder for chaining method calls + */ + public Builder setBins(BlockingQueue<NodeBin> bins) { + this.bins = bins; + return this; + } + + /** + * Set BindingSet queue + * @param bindingSets - work queue containing BindingSets to be exported + * @return this Builder for chaining method calls + */ + public Builder setBindingSets(BlockingQueue<BindingSetRecord> bindingSets) { + this.bindingSets = bindingSets; + return this; + } + + /** + * Sets the number of threads used by this processor + * @param threadNumber - number of threads used by this processor + * @return - number of threads used by this processor + */ + public Builder setThreadNumber(int threadNumber) { + this.threadNumber = threadNumber; + return this; + } + + /** + * Set the PeriodicStorage layer + * @param periodicStorage - periodic storage layer that periodic results are read from + * @return - this Builder for chaining method calls + */ + public Builder setPeriodicStorage(PeriodicQueryResultStorage periodicStorage) { + this.periodicStorage = periodicStorage; + return this; + } + + /** + * Builds a TimestampedNotificationProcessor + * @return - TimestampedNotificationProcessor built from arguments passed to this Builder + */ + public TimestampedNotificationProcessor build() { + return new TimestampedNotificationProcessor(periodicStorage, notifications, bins, bindingSets, threadNumber); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java new file mode 100644 index 0000000..4dac64c --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/AccumuloBinPruner.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; + +import jline.internal.Preconditions; + +/** + * Deletes BindingSets from time bins in the indicated PCJ table + */ +public class AccumuloBinPruner implements BinPruner { + + private static final Logger log = Logger.getLogger(AccumuloBinPruner.class); + private PeriodicQueryResultStorage periodicStorage; + + public AccumuloBinPruner(PeriodicQueryResultStorage periodicStorage) { + Preconditions.checkNotNull(periodicStorage); + this.periodicStorage = periodicStorage; + } + + /** + * This method deletes all BindingSets in the indicated bin from the PCJ + * table indicated by the id. It is assumed that all BindingSet entries for + * the corresponding bin are written to the PCJ table so that the bin Id + * occurs first. + * + * @param id + * - pcj table id + * @param bin + * - temporal bin the BindingSets are contained in + */ + @Override + public void pruneBindingSetBin(NodeBin nodeBin) { + Preconditions.checkNotNull(nodeBin); + String id = nodeBin.getNodeId(); + long bin = nodeBin.getBin(); + try { + periodicStorage.deletePeriodicQueryResults(id, bin); + } catch (PeriodicQueryStorageException e) { + log.trace("Unable to delete results from Peroidic Table: " + id + " for bin: " + bin); + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java new file mode 100644 index 0000000..bee9c02 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/FluoBinPruner.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.batch.BatchInformationDAO; +import org.apache.rya.indexing.pcj.fluo.app.batch.SpanBatchDeleteInformation; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; + +import com.google.common.base.Optional; + +/** + * Deletes {@link BindingSet}s from the indicated Fluo table. + */ +public class FluoBinPruner implements BinPruner { + + private static final Logger log = Logger.getLogger(FluoBinPruner.class); + private FluoClient client; + + public FluoBinPruner(FluoClient client) { + this.client = client; + } + + /** + * This method deletes BindingSets in the specified bin from the BindingSet + * Column of the indicated Fluo nodeId + * + * @param id + * - Fluo nodeId + * @param bin + * - bin id + */ + @Override + public void pruneBindingSetBin(NodeBin nodeBin) { + String id = nodeBin.getNodeId(); + long bin = nodeBin.getBin(); + try (Transaction tx = client.newTransaction()) { + Optional<NodeType> type = NodeType.fromNodeId(id); + if (!type.isPresent()) { + log.trace("Unable to determine NodeType from id: " + id); + throw new RuntimeException(); + } + Column batchInfoColumn = type.get().getResultColumn(); + String batchInfoSpanPrefix = id + IncrementalUpdateConstants.NODEID_BS_DELIM + bin; + SpanBatchDeleteInformation batchInfo = SpanBatchDeleteInformation.builder().setColumn(batchInfoColumn) + .setSpan(Span.prefix(Bytes.of(batchInfoSpanPrefix))).build(); + BatchInformationDAO.addBatch(tx, id, batchInfo); + tx.commit(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java new file mode 100644 index 0000000..97e3f22 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPruner.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.periodic.notification.api.BinPruner; +import org.apache.rya.periodic.notification.api.NodeBin; + +import jline.internal.Preconditions; + +/** + * Implementation of {@link BinPruner} that deletes old, already processed + * Periodic Query results from Fluo and the PCJ table to which the Fluo results + * are exported. + * + */ +public class PeriodicQueryPruner implements BinPruner, Runnable { + + private static final Logger log = Logger.getLogger(PeriodicQueryPruner.class); + private FluoClient client; + private AccumuloBinPruner accPruner; + private FluoBinPruner fluoPruner; + private BlockingQueue<NodeBin> bins; + private AtomicBoolean closed = new AtomicBoolean(false); + private int threadNumber; + + public PeriodicQueryPruner(FluoBinPruner fluoPruner, AccumuloBinPruner accPruner, FluoClient client, BlockingQueue<NodeBin> bins, int threadNumber) { + this.fluoPruner = Preconditions.checkNotNull(fluoPruner); + this.accPruner = Preconditions.checkNotNull(accPruner); + this.client = Preconditions.checkNotNull(client); + this.bins = Preconditions.checkNotNull(bins); + this.threadNumber = threadNumber; + } + + @Override + public void run() { + try { + while (!closed.get()) { + pruneBindingSetBin(bins.take()); + } + } catch (InterruptedException e) { + log.trace("Thread " + threadNumber + " is unable to prune the next message."); + throw new RuntimeException(e); + } + } + + /** + * Prunes BindingSet bins from the Rya Fluo Application in addition to the BindingSet + * bins created in the PCJ tables associated with the give query id. + * @param id - QueryResult Id for the Rya Fluo application + * @param bin - bin id for bins to be deleted + */ + @Override + public void pruneBindingSetBin(NodeBin nodeBin) { + String id = nodeBin.getNodeId(); + long bin = nodeBin.getBin(); + try(Snapshot sx = client.newSnapshot()) { + String queryId = sx.get(Bytes.of(id), FluoQueryColumns.PCJ_ID_QUERY_ID).toString(); + Set<String> fluoIds = getNodeIdsFromResultId(sx, queryId); + accPruner.pruneBindingSetBin(new NodeBin(id, bin)); + for(String fluoId: fluoIds) { + fluoPruner.pruneBindingSetBin(new NodeBin(fluoId, bin)); + } + } catch (Exception e) { + log.trace("Could not successfully initialize PeriodicQueryBinPruner."); + } + } + + + public void shutdown() { + closed.set(true); + } + + private Set<String> getNodeIdsFromResultId(SnapshotBase sx, String id) { + Set<String> ids = new HashSet<>(); + PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, id, ids); + return ids; + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java new file mode 100644 index 0000000..1c11f96 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/pruner/PeriodicQueryPrunerExecutor.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.pruner; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.NodeBin; + +import com.google.common.base.Preconditions; + +/** + * Executor service that runs {@link PeriodicQueryPruner}s with added functionality + * for starting, stopping, and determining if the query pruners are running. + */ +public class PeriodicQueryPrunerExecutor implements LifeCycle { + + private static final Logger log = Logger.getLogger(PeriodicQueryPrunerExecutor.class); + private FluoClient client; + private int numThreads; + private ExecutorService executor; + private BlockingQueue<NodeBin> bins; + private PeriodicQueryResultStorage periodicStorage; + private List<PeriodicQueryPruner> pruners; + private boolean running = false; + + public PeriodicQueryPrunerExecutor(PeriodicQueryResultStorage periodicStorage, FluoClient client, int numThreads, + BlockingQueue<NodeBin> bins) { + Preconditions.checkArgument(numThreads > 0); + this.periodicStorage = periodicStorage; + this.numThreads = numThreads; + executor = Executors.newFixedThreadPool(numThreads); + this.bins = bins; + this.client = client; + this.pruners = new ArrayList<>(); + } + + @Override + public void start() { + if (!running) { + AccumuloBinPruner accPruner = new AccumuloBinPruner(periodicStorage); + FluoBinPruner fluoPruner = new FluoBinPruner(client); + + for (int threadNumber = 0; threadNumber < numThreads; threadNumber++) { + PeriodicQueryPruner pruner = new PeriodicQueryPruner(fluoPruner, accPruner, client, bins, threadNumber); + pruners.add(pruner); + executor.submit(pruner); + } + running = true; + } + } + + @Override + public void stop() { + if (pruners != null && pruners.size() > 0) { + pruners.forEach(x -> x.shutdown()); + } + if(client != null) { + client.close(); + } + if (executor != null) { + executor.shutdown(); + running = false; + } + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + log.info("Interrupted during shutdown, exiting uncleanly"); + } + } + + @Override + public boolean currentlyRunning() { + return running; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java new file mode 100644 index 0000000..8e8b1a2 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/recovery/PeriodicNotificationProvider.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.recovery; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.ColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.coordinator.PeriodicNotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; + +/** + * This class is used by the {@link PeriodicNotificationCoordinatorExecutor} + * to add all existing {@link PeriodicNotification}s stored in Fluo when it is + * initialized. This enables the the {@link PeriodicServiceApplication} to be + * recovered from failure by restoring it original state. + * + */ +public class PeriodicNotificationProvider { + + private FluoQueryMetadataDAO dao; + + public PeriodicNotificationProvider() { + this.dao = new FluoQueryMetadataDAO(); + } + + /** + * Retrieve all of the information about Periodic Query results already registered + * with Fluo. This is returned in the form of {@link CommandNotification}s that + * can be registered with the {@link NotificationCoordinatorExecutor}. + * @param sx - snapshot for reading results from Fluo + * @return - collection of CommandNotifications that indicate Periodic Query information registered with system + */ + public Collection<CommandNotification> getNotifications(Snapshot sx) { + Set<PeriodicQueryMetadata> periodicMetadata = new HashSet<>(); + RowScanner scanner = sx.scanner().fetch(FluoQueryColumns.PERIODIC_QUERY_NODE_ID) + .over(Span.prefix(IncrementalUpdateConstants.PERIODIC_QUERY_PREFIX)).byRow().build(); + Iterator<ColumnScanner> colScannerIter = scanner.iterator(); + while (colScannerIter.hasNext()) { + ColumnScanner colScanner = colScannerIter.next(); + Iterator<ColumnValue> values = colScanner.iterator(); + while (values.hasNext()) { + PeriodicQueryMetadata metadata = dao.readPeriodicQueryMetadata(sx, values.next().getsValue()); + periodicMetadata.add(metadata); + } + } + return getCommandNotifications(sx, periodicMetadata); + } + + /** + * Registers all of Periodic Query information already contained within Fluo to the + * {@link NotificationCoordinatorExecutor}. + * @param coordinator - coordinator that periodic info will be registered with + * @param sx - snapshot for reading results from Fluo + */ + public void processRegisteredNotifications(NotificationCoordinatorExecutor coordinator, Snapshot sx) { + coordinator.start(); + Collection<CommandNotification> notifications = getNotifications(sx); + for(CommandNotification notification: notifications) { + coordinator.processNextCommandNotification(notification); + } + } + + private Collection<CommandNotification> getCommandNotifications(Snapshot sx, Collection<PeriodicQueryMetadata> metadata) { + Set<CommandNotification> notifications = new HashSet<>(); + int i = 1; + for(PeriodicQueryMetadata meta:metadata) { + //offset initial wait to avoid overloading system + PeriodicNotification periodic = new PeriodicNotification(getQueryId(meta.getNodeId(), sx), meta.getPeriod(),TimeUnit.MILLISECONDS,i*5000); + notifications.add(new CommandNotification(Command.ADD, periodic)); + i++; + } + return notifications; + } + + private String getQueryId(String periodicNodeId, Snapshot sx) { + return getQueryIdFromPeriodicId(sx, periodicNodeId); + } + + private String getQueryIdFromPeriodicId(Snapshot sx, String nodeId) { + NodeType nodeType = NodeType.fromNodeId(nodeId).orNull(); + String id = null; + switch (nodeType) { + case FILTER: + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.FILTER_PARENT_NODE_ID).toString()); + break; + case PERIODIC_QUERY: + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.PERIODIC_QUERY_PARENT_NODE_ID).toString()); + break; + case QUERY: + id = sx.get(Bytes.of(nodeId), FluoQueryColumns.RYA_PCJ_ID).toString(); + break; + case AGGREGATION: + id = getQueryIdFromPeriodicId(sx, sx.get(Bytes.of(nodeId), FluoQueryColumns.AGGREGATION_PARENT_NODE_ID).toString()); + break; + case CONSTRUCT: + id = sx.get(Bytes.of(nodeId), FluoQueryColumns.CONSTRUCT_NODE_ID).toString(); + id = id.split(IncrementalUpdateConstants.CONSTRUCT_PREFIX)[1]; + break; + default: + throw new RuntimeException("Invalid NodeType."); + } + return id; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java new file mode 100644 index 0000000..f5cd13a --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationProvider.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.registration.kafka; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.rya.periodic.notification.api.LifeCycle; +import org.apache.rya.periodic.notification.api.Notification; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Consumer group to pull all requests for adding and deleting {@link Notification}s + * from Kafka. This Object executes {@link PeriodicNotificationConsumer}s that retrieve + * the {@link CommandNotification}s and register them with the {@link NotificationCoordinatorExecutor}. + * + */ +public class KafkaNotificationProvider implements LifeCycle { + private static final Logger LOG = LoggerFactory.getLogger(KafkaNotificationProvider.class); + private String topic; + private ExecutorService executor; + private NotificationCoordinatorExecutor coord; + private Properties props; + private int numThreads; + private boolean running = false; + Deserializer<String> keyDe; + Deserializer<CommandNotification> valDe; + List<PeriodicNotificationConsumer> consumers; + + /** + * Create KafkaNotificationProvider for reading new notification requests form Kafka + * @param topic - notification topic + * @param keyDe - Kafka message key deserializer + * @param valDe - Kafka message value deserializer + * @param props - properties used to creates a {@link KafkaConsumer} + * @param coord - {@link NotificationCoordinatorExecutor} for managing and generating notifications + * @param numThreads - number of threads used by this notification provider + */ + public KafkaNotificationProvider(String topic, Deserializer<String> keyDe, Deserializer<CommandNotification> valDe, Properties props, + NotificationCoordinatorExecutor coord, int numThreads) { + this.coord = coord; + this.numThreads = numThreads; + this.topic = topic; + this.props = props; + this.consumers = new ArrayList<>(); + this.keyDe = keyDe; + this.valDe = valDe; + } + + @Override + public void stop() { + if (consumers != null && consumers.size() > 0) { + for (PeriodicNotificationConsumer consumer : consumers) { + consumer.shutdown(); + } + } + if (executor != null) { + executor.shutdown(); + } + running = false; + try { + if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { + LOG.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); + executor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.info("Interrupted during shutdown, exiting uncleanly"); + } + } + + public void start() { + if (!running) { + if (!coord.currentlyRunning()) { + coord.start(); + } + // now launch all the threads + executor = Executors.newFixedThreadPool(numThreads); + + // now create consumers to consume the messages + int threadNumber = 0; + for (int i = 0; i < numThreads; i++) { + LOG.info("Creating consumer:" + threadNumber); + KafkaConsumer<String, CommandNotification> consumer = new KafkaConsumer<String, CommandNotification>(props, keyDe, valDe); + PeriodicNotificationConsumer periodicConsumer = new PeriodicNotificationConsumer(topic, consumer, threadNumber, coord); + consumers.add(periodicConsumer); + executor.submit(periodicConsumer); + threadNumber++; + } + running = true; + } + } + + @Override + public boolean currentlyRunning() { + return running; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java new file mode 100644 index 0000000..ec94bb7 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/KafkaNotificationRegistrationClient.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.registration.kafka; + +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.rya.periodic.notification.api.Notification; +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient; +import org.apache.rya.periodic.notification.notification.BasicNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; + +/** + * Implementation of {@link PeriodicNotificaitonClient} used to register new notification + * requests with the PeriodicQueryService. + * + */ +public class KafkaNotificationRegistrationClient implements PeriodicNotificationClient { + + private KafkaProducer<String, CommandNotification> producer; + private String topic; + + public KafkaNotificationRegistrationClient(String topic, KafkaProducer<String, CommandNotification> producer) { + this.topic = topic; + this.producer = producer; + } + + @Override + public void addNotification(PeriodicNotification notification) { + processNotification(new CommandNotification(Command.ADD, notification)); + + } + + @Override + public void deleteNotification(BasicNotification notification) { + processNotification(new CommandNotification(Command.DELETE, notification)); + } + + @Override + public void deleteNotification(String notificationId) { + processNotification(new CommandNotification(Command.DELETE, new BasicNotification(notificationId))); + } + + @Override + public void addNotification(String id, long period, long delay, TimeUnit unit) { + Notification notification = PeriodicNotification.builder().id(id).period(period).initialDelay(delay).timeUnit(unit).build(); + processNotification(new CommandNotification(Command.ADD, notification)); + } + + + private void processNotification(CommandNotification notification) { + producer.send(new ProducerRecord<String, CommandNotification>(topic, notification.getId(), notification)); + } + + @Override + public void close() { + producer.close(); + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java new file mode 100644 index 0000000..6785ce8 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/registration/kafka/PeriodicNotificationConsumer.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.registration.kafka; + +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.log4j.Logger; +import org.apache.rya.periodic.notification.api.NotificationCoordinatorExecutor; +import org.apache.rya.periodic.notification.notification.CommandNotification; + +/** + * Consumer for the {@link KafkaNotificationProvider}. This consumer pull messages + * from Kafka and registers them with the {@link NotificationCoordinatorExecutor}. + * + */ +public class PeriodicNotificationConsumer implements Runnable { + private KafkaConsumer<String, CommandNotification> consumer; + private int m_threadNumber; + private String topic; + private final AtomicBoolean closed = new AtomicBoolean(false); + private NotificationCoordinatorExecutor coord; + private static final Logger LOG = Logger.getLogger(PeriodicNotificationConsumer.class); + + /** + * Creates a new PeriodicNotificationConsumer for consuming new notification requests from + * Kafka. + * @param topic - new notification topic + * @param consumer - consumer for pulling new requests from Kafka + * @param a_threadNumber - number of consumer threads to be used + * @param coord - notification coordinator for managing and generating notifications + */ + public PeriodicNotificationConsumer(String topic, KafkaConsumer<String, CommandNotification> consumer, int a_threadNumber, + NotificationCoordinatorExecutor coord) { + this.topic = topic; + m_threadNumber = a_threadNumber; + this.consumer = consumer; + this.coord = coord; + } + + public void run() { + + try { + LOG.info("Creating kafka stream for consumer:" + m_threadNumber); + consumer.subscribe(Arrays.asList(topic)); + while (!closed.get()) { + ConsumerRecords<String, CommandNotification> records = consumer.poll(10000); + // Handle new records + for(ConsumerRecord<String, CommandNotification> record: records) { + CommandNotification notification = record.value(); + LOG.info("Thread " + m_threadNumber + " is adding notification " + notification + " to queue."); + LOG.info("Message: " + notification); + coord.processNextCommandNotification(notification); + } + } + } catch (WakeupException e) { + // Ignore exception if closing + if (!closed.get()) throw e; + } finally { + consumer.close(); + } + } + + public void shutdown() { + closed.set(true); + consumer.wakeup(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java new file mode 100644 index 0000000..bd29d29 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BasicNotificationTypeAdapter.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.lang.reflect.Type; + +import org.apache.rya.periodic.notification.notification.BasicNotification; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * {@link TypeAdapter} for {@link BasicNotification}s. Used in {@link CommandNotificationTypeAdapter} to + * serialize {@link CommandNotification}s. + * + */ +public class BasicNotificationTypeAdapter implements JsonDeserializer<BasicNotification>, JsonSerializer<BasicNotification> { + + @Override + public JsonElement serialize(BasicNotification arg0, Type arg1, JsonSerializationContext arg2) { + JsonObject result = new JsonObject(); + result.add("id", new JsonPrimitive(arg0.getId())); + return result; + } + + @Override + public BasicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) throws JsonParseException { + JsonObject json = arg0.getAsJsonObject(); + String id = json.get("id").getAsString(); + return new BasicNotification(id); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java new file mode 100644 index 0000000..50180ad --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/BindingSetSerDe.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.log4j.Logger; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjSerializer; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.openrdf.query.BindingSet; +import org.openrdf.query.algebra.evaluation.QueryBindingSet; + +import com.google.common.base.Joiner; +import com.google.common.primitives.Bytes; + +/** + * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming messages + * from Kafka. + * + */ +public class BindingSetSerDe implements Serializer<BindingSet>, Deserializer<BindingSet> { + + private static final Logger log = Logger.getLogger(BindingSetSerDe.class); + private static final AccumuloPcjSerializer serializer = new AccumuloPcjSerializer(); + private static final byte[] DELIM_BYTE = "\u0002".getBytes(); + + private byte[] toBytes(BindingSet bindingSet) { + try { + return getBytes(getVarOrder(bindingSet), bindingSet); + } catch(Exception e) { + log.trace("Unable to serialize BindingSet: " + bindingSet); + return new byte[0]; + } + } + + private BindingSet fromBytes(byte[] bsBytes) { + try{ + int firstIndex = Bytes.indexOf(bsBytes, DELIM_BYTE); + byte[] varOrderBytes = Arrays.copyOf(bsBytes, firstIndex); + byte[] bsBytesNoVarOrder = Arrays.copyOfRange(bsBytes, firstIndex + 1, bsBytes.length); + VariableOrder varOrder = new VariableOrder(new String(varOrderBytes,"UTF-8").split(";")); + return getBindingSet(varOrder, bsBytesNoVarOrder); + } catch(Exception e) { + log.trace("Unable to deserialize BindingSet: " + bsBytes); + return new QueryBindingSet(); + } + } + + private VariableOrder getVarOrder(BindingSet bs) { + return new VariableOrder(bs.getBindingNames()); + } + + private byte[] getBytes(VariableOrder varOrder, BindingSet bs) throws UnsupportedEncodingException, BindingSetConversionException { + byte[] bsBytes = serializer.convert(bs, varOrder); + String varOrderString = Joiner.on(";").join(varOrder.getVariableOrders()); + byte[] varOrderBytes = varOrderString.getBytes("UTF-8"); + return Bytes.concat(varOrderBytes, DELIM_BYTE, bsBytes); + } + + private BindingSet getBindingSet(VariableOrder varOrder, byte[] bsBytes) throws BindingSetConversionException { + return serializer.convert(bsBytes, varOrder); + } + + @Override + public BindingSet deserialize(String topic, byte[] bytes) { + return fromBytes(bytes); + } + + @Override + public void close() { + // Do nothing. Nothing to close. + } + + @Override + public void configure(Map<String, ?> arg0, boolean arg1) { + // Do nothing. Nothing to configure. + } + + @Override + public byte[] serialize(String topic, BindingSet bs) { + return toBytes(bs); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java new file mode 100644 index 0000000..302e1be --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializer.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.io.UnsupportedEncodingException; +import java.util.Map; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.rya.periodic.notification.api.Notification; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * Kafka {@link Serializer} and {@link Deserializer} for producing and consuming {@link CommandNotification}s + * to and from Kafka. + * + */ +public class CommandNotificationSerializer implements Serializer<CommandNotification>, Deserializer<CommandNotification> { + + private static Gson gson = new GsonBuilder() + .registerTypeHierarchyAdapter(Notification.class, new CommandNotificationTypeAdapter()).create(); + private static final Logger LOG = LoggerFactory.getLogger(CommandNotificationSerializer.class); + + @Override + public CommandNotification deserialize(String topic, byte[] bytes) { + String json = null; + try { + json = new String(bytes, "UTF-8"); + } catch (UnsupportedEncodingException e) { + LOG.info("Unable to deserialize notification for topic: " + topic); + } + return gson.fromJson(json, CommandNotification.class); + } + + @Override + public byte[] serialize(String topic, CommandNotification command) { + try { + return gson.toJson(command).getBytes("UTF-8"); + } catch (UnsupportedEncodingException e) { + LOG.info("Unable to serialize notification: " + command + "for topic: " + topic); + throw new RuntimeException(e); + } + } + + @Override + public void close() { + // Do nothing. Nothing to close + } + + @Override + public void configure(Map<String, ?> arg0, boolean arg1) { + // Do nothing. Nothing to configure + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java new file mode 100644 index 0000000..a9fb7e1 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/CommandNotificationTypeAdapter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.lang.reflect.Type; + +import org.apache.rya.periodic.notification.api.Notification; +import org.apache.rya.periodic.notification.notification.BasicNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * {@link TypeAdapter} used to serialize and deserialize {@link CommandNotification}s. + * This TypeAdapter is used in {@link CommandNotificationSerializer} for producing and + * consuming messages to and from Kafka. + * + */ +public class CommandNotificationTypeAdapter + implements JsonDeserializer<CommandNotification>, JsonSerializer<CommandNotification> { + + @Override + public JsonElement serialize(CommandNotification arg0, Type arg1, JsonSerializationContext arg2) { + JsonObject result = new JsonObject(); + result.add("command", new JsonPrimitive(arg0.getCommand().name())); + Notification notification = arg0.getNotification(); + if (notification instanceof PeriodicNotification) { + result.add("type", new JsonPrimitive(PeriodicNotification.class.getSimpleName())); + PeriodicNotificationTypeAdapter adapter = new PeriodicNotificationTypeAdapter(); + result.add("notification", + adapter.serialize((PeriodicNotification) notification, PeriodicNotification.class, arg2)); + } else if (notification instanceof BasicNotification) { + result.add("type", new JsonPrimitive(BasicNotification.class.getSimpleName())); + BasicNotificationTypeAdapter adapter = new BasicNotificationTypeAdapter(); + result.add("notification", + adapter.serialize((BasicNotification) notification, BasicNotification.class, arg2)); + } else { + throw new IllegalArgumentException("Invalid notification type."); + } + return result; + } + + @Override + public CommandNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) + throws JsonParseException { + + JsonObject json = arg0.getAsJsonObject(); + Command command = Command.valueOf(json.get("command").getAsString()); + String type = json.get("type").getAsString(); + Notification notification = null; + if (type.equals(PeriodicNotification.class.getSimpleName())) { + notification = (new PeriodicNotificationTypeAdapter()).deserialize(json.get("notification"), + PeriodicNotification.class, arg2); + } else if (type.equals(BasicNotification.class.getSimpleName())) { + notification = (new BasicNotificationTypeAdapter()).deserialize(json.get("notification"), + BasicNotification.class, arg2); + } else { + throw new JsonParseException("Cannot deserialize Json"); + } + + return new CommandNotification(command, notification); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java new file mode 100644 index 0000000..fcc0ba2 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/main/java/org/apache/rya/periodic/notification/serialization/PeriodicNotificationTypeAdapter.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.lang.reflect.Type; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.apache.rya.periodic.notification.notification.PeriodicNotification.Builder; + +import com.google.gson.JsonDeserializationContext; +import com.google.gson.JsonDeserializer; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParseException; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; + +/** + * {@link TypeAdapter} used to serialize and deserialize {@link PeriodicNotification}s. + * This TypeAdapter is used in {@link CommandNotificationTypeAdapter} which is used in + * {@link CommandNotificationSerializer} for producing and consuming messages to and from + * Kafka. + * + */ +public class PeriodicNotificationTypeAdapter + implements JsonSerializer<PeriodicNotification>, JsonDeserializer<PeriodicNotification> { + + @Override + public PeriodicNotification deserialize(JsonElement arg0, Type arg1, JsonDeserializationContext arg2) + throws JsonParseException { + + JsonObject json = arg0.getAsJsonObject(); + String id = json.get("id").getAsString(); + long period = json.get("period").getAsLong(); + TimeUnit periodTimeUnit = TimeUnit.valueOf(json.get("timeUnit").getAsString()); + long initialDelay = json.get("initialDelay").getAsLong(); + Builder builder = PeriodicNotification.builder().id(id).period(period) + .initialDelay(initialDelay).timeUnit(periodTimeUnit); + + return builder.build(); + } + + @Override + public JsonElement serialize(PeriodicNotification arg0, Type arg1, JsonSerializationContext arg2) { + + JsonObject result = new JsonObject(); + result.add("id", new JsonPrimitive(arg0.getId())); + result.add("period", new JsonPrimitive(arg0.getPeriod())); + result.add("initialDelay", new JsonPrimitive(arg0.getInitialDelay())); + result.add("timeUnit", new JsonPrimitive(arg0.getTimeUnit().name())); + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/periodic.service.notification/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/periodic.service.notification/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java b/extras/rya.periodic.service/periodic.service.notification/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java new file mode 100644 index 0000000..4aad1c6 --- /dev/null +++ b/extras/rya.periodic.service/periodic.service.notification/src/test/java/org/apache/rya/periodic/notification/serialization/CommandNotificationSerializerTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.periodic.notification.serialization; + +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.rya.periodic.notification.notification.BasicNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.notification.CommandNotification.Command; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.junit.Assert; +import org.junit.Test; + +public class CommandNotificationSerializerTest { + + private CommandNotificationSerializer serializer = new CommandNotificationSerializer(); + private static final String topic = "topic"; + + @Test + public void basicSerializationTest() { + PeriodicNotification notification = PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(24) + .timeUnit(TimeUnit.DAYS).initialDelay(1).build(); + CommandNotification command = new CommandNotification(Command.ADD, notification); + Assert.assertEquals(command, serializer.deserialize(topic,serializer.serialize(topic, command))); + + PeriodicNotification notification1 = PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(32) + .timeUnit(TimeUnit.SECONDS).initialDelay(15).build(); + CommandNotification command1 = new CommandNotification(Command.ADD, notification1); + Assert.assertEquals(command1, serializer.deserialize(topic,serializer.serialize(topic,command1))); + + PeriodicNotification notification2 = PeriodicNotification.builder().id(UUID.randomUUID().toString()).period(32) + .timeUnit(TimeUnit.SECONDS).initialDelay(15).build(); + CommandNotification command2 = new CommandNotification(Command.ADD, notification2); + Assert.assertEquals(command2, serializer.deserialize(topic,serializer.serialize(topic,command2))); + + BasicNotification notification3 = new BasicNotification(UUID.randomUUID().toString()); + CommandNotification command3 = new CommandNotification(Command.ADD, notification3); + Assert.assertEquals(command3, serializer.deserialize(topic,serializer.serialize(topic,command3))); + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/2ca85427/extras/rya.periodic.service/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.periodic.service/pom.xml b/extras/rya.periodic.service/pom.xml new file mode 100644 index 0000000..fce4996 --- /dev/null +++ b/extras/rya.periodic.service/pom.xml @@ -0,0 +1,39 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.periodic.service</artifactId> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.extras</artifactId> + <version>3.2.11-incubating-SNAPSHOT</version> + </parent> + + <name>Apache Rya Periodic Service</name> + <description>Parent class for Rya Periodic Service</description> + + <packaging>pom</packaging> + + <modules> + <module>periodic.service.notification</module> + <module>periodic.service.integration.tests</module> + </modules> + +</project> \ No newline at end of file
