RYA-319-Integration of Periodic Query with CLI. Closes #220.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/63f87b86 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/63f87b86 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/63f87b86 Branch: refs/heads/master Commit: 63f87b868f33c718f085f5a7907d22b823dcd5d3 Parents: ad6ab01 Author: Caleb Meier <[email protected]> Authored: Mon Aug 7 21:22:00 2017 -0700 Committer: Caleb Meier <[email protected]> Committed: Mon Sep 4 06:11:12 2017 -0700 ---------------------------------------------------------------------- .../org/apache/rya/api/client/CreatePCJ.java | 4 +- .../rya/api/client/CreatePeriodicPCJ.java | 40 ++++ .../rya/api/client/DeletePeriodicPCJ.java | 38 ++++ .../rya/api/client/ListIncrementalQueries.java | 38 ++++ .../org/apache/rya/api/client/RyaClient.java | 31 +++ extras/indexing/pom.xml | 5 +- .../accumulo/AccumuloCreatePeriodicPCJ.java | 145 ++++++++++++ .../accumulo/AccumuloDeletePeriodicPCJ.java | 135 +++++++++++ .../AccumuloListIncrementalQueries.java | 101 +++++++++ .../accumulo/AccumuloRyaClientFactory.java | 3 + extras/rya.pcj.fluo/pcj.fluo.api/pom.xml | 4 + .../indexing/pcj/fluo/api/CreateFluoPcj.java | 24 +- .../pcj/fluo/api/CreatePeriodicQuery.java | 215 ++++++++++++++++++ .../pcj/fluo/api/DeletePeriodicQuery.java | 92 ++++++++ .../indexing/pcj/fluo/api/ListFluoQueries.java | 149 ++++++++++++ .../fluo/app/IncrementalUpdateConstants.java | 1 + .../pcj/fluo/app/export/ExporterManager.java | 23 +- .../pcj/fluo/app/export/NoOpExporter.java | 59 ----- .../KafkaBindingSetExporterParameters.java | 1 - .../export/rya/PeriodicBindingSetExporter.java | 2 +- .../fluo/app/observers/QueryResultObserver.java | 4 +- .../indexing/pcj/fluo/app/query/FluoQuery.java | 6 +- .../pcj/fluo/app/query/FluoQueryColumns.java | 1 + .../fluo/app/query/FluoQueryMetadataDAO.java | 6 +- .../pcj/fluo/api/ListFluoQueriesIT.java | 96 ++++++++ .../indexing/pcj/fluo/integration/BatchIT.java | 12 +- .../pcj/fluo/integration/CreateDeleteIT.java | 3 +- .../integration/CreateDeletePeriodicPCJ.java | 227 +++++++++++++++++++ .../pcj/fluo/integration/KafkaExportIT.java | 24 +- .../indexing/pcj/fluo/integration/QueryIT.java | 40 ++-- .../pcj/fluo/test/base/KafkaExportITBase.java | 17 +- .../periodic.service.api/.gitignore | 1 + .../periodic.service.api/pom.xml | 52 +++++ .../periodic/notification/api/BinPruner.java | 40 ++++ .../notification/api/BindingSetExporter.java | 37 +++ .../notification/api/BindingSetRecord.java | 80 +++++++ .../api/BindingSetRecordExportException.java | 45 ++++ .../periodic/notification/api/LifeCycle.java | 45 ++++ .../rya/periodic/notification/api/NodeBin.java | 77 +++++++ .../periodic/notification/api/Notification.java | 34 +++ .../api/NotificationCoordinatorExecutor.java | 41 ++++ .../notification/api/NotificationProcessor.java | 41 ++++ .../api/PeriodicNotificationClient.java | 64 ++++++ .../notification/BasicNotification.java | 76 +++++++ .../notification/CommandNotification.java | 99 ++++++++ .../notification/PeriodicNotification.java | 178 +++++++++++++++ .../notification/TimestampedNotification.java | 69 ++++++ .../KafkaNotificationRegistrationClient.java | 80 +++++++ .../BasicNotificationTypeAdapter.java | 55 +++++ .../serialization/BindingSetSerDe.java | 105 +++++++++ .../CommandNotificationSerializer.java | 76 +++++++ .../CommandNotificationTypeAdapter.java | 89 ++++++++ .../PeriodicNotificationTypeAdapter.java | 73 ++++++ .../periodic.service.integration.tests/pom.xml | 29 +-- .../PeriodicNotificationApplicationIT.java | 102 ++++----- .../PeriodicNotificationProviderIT.java | 5 +- .../PeriodicNotificationExporterIT.java | 1 + .../PeriodicNotificationProcessorIT.java | 2 +- .../pruner/PeriodicNotificationBinPrunerIT.java | 7 +- .../PeriodicCommandNotificationConsumerIT.java | 31 ++- .../periodic.service.notification/pom.xml | 201 ++++++++-------- .../periodic/notification/api/BinPruner.java | 40 ---- .../notification/api/BindingSetExporter.java | 38 ---- .../notification/api/CreatePeriodicQuery.java | 124 ---------- .../periodic/notification/api/LifeCycle.java | 45 ---- .../rya/periodic/notification/api/NodeBin.java | 77 ------- .../periodic/notification/api/Notification.java | 34 --- .../api/NotificationCoordinatorExecutor.java | 41 ---- .../notification/api/NotificationProcessor.java | 41 ---- .../api/PeriodicNotificationClient.java | 64 ------ .../PeriodicNotificationApplication.java | 2 +- .../PeriodicNotificationApplicationFactory.java | 2 +- .../notification/exporter/BindingSetRecord.java | 80 ------- .../exporter/KafkaExporterExecutor.java | 1 + .../KafkaPeriodicBindingSetExporter.java | 9 +- .../notification/BasicNotification.java | 76 ------- .../notification/CommandNotification.java | 99 -------- .../notification/PeriodicNotification.java | 178 --------------- .../notification/TimestampedNotification.java | 69 ------ .../NotificationProcessorExecutor.java | 2 +- .../TimestampedNotificationProcessor.java | 2 +- .../KafkaNotificationRegistrationClient.java | 80 ------- .../BasicNotificationTypeAdapter.java | 55 ----- .../serialization/BindingSetSerDe.java | 105 --------- .../CommandNotificationSerializer.java | 76 ------- .../CommandNotificationTypeAdapter.java | 89 -------- .../PeriodicNotificationTypeAdapter.java | 73 ------ extras/rya.periodic.service/pom.xml | 1 + .../org/apache/rya/shell/RyaAdminCommands.java | 81 ++++++- .../apache/rya/shell/RyaAdminCommandsTest.java | 65 ++++++ pom.xml | 20 ++ 91 files changed, 3235 insertions(+), 1815 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java index 6e92b28..3c369d8 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java @@ -28,7 +28,7 @@ import edu.umd.cs.findbugs.annotations.NonNull; */ @DefaultAnnotation(NonNull.class) public interface CreatePCJ { - + /** * Metadata enum used to indicate the type of query that is registered. If * the topmost node is a Construct QueryNode, then the type is Construct. If the @@ -44,7 +44,7 @@ public interface CreatePCJ { * Application. * */ - public static enum ExportStrategy{RYA, KAFKA, NO_OP_EXPORT}; + public static enum ExportStrategy{RYA, KAFKA, PERIODIC}; /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java new file mode 100644 index 0000000..7c006d0 --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePeriodicPCJ.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +/** + * This class creates new PeriodicPCJ for a given Rya instance. + */ +public interface CreatePeriodicPCJ { + + /** + * Creates a new PeriodicPCJ for a given Rya instance. The provided periodicTopic and bootStrapServers are used for + * registering new PeriodiNotifications with the underlying notification registration service. Typically, the + * bootStrapServers are the IP for the KafkaBrokers. + * + * @param instanceName - Rya instance to connect to + * @param sparql - SPARQL query registered with the Periodic Service + * @param periodicTopic - Kafka topic that new PeriodicNotifications are exported to for registration with the + * PeriodicService + * @param bootStrapServers - Connection string for Kafka brokers + * @return Fluo Query Id of the registered Periodic Query + */ + public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException; + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java new file mode 100644 index 0000000..c30afd2 --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/DeletePeriodicPCJ.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +/** + * Deletes and instance of a Periodic PCJ from Rya + */ +public interface DeletePeriodicPCJ { + + /** + * Deletes a PCJ from an instance of Rya. + * + * @param instanceName - Indicates which Rya instance is maintaining the Periodic PCJ. (not null) + * @param pcjId - The ID of the Periodic PCJ that will be deleted. (not null) + * @param topic - Kafka topic for deleteing PeriodicNotifications + * @param brokers - Comma delimited host/port pairs for connecting to Kafka brokers. + * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name. + * @throws RyaClientException Something caused the command to fail. + */ + public void deletePeriodicPCJ(String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException; + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java b/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java new file mode 100644 index 0000000..75e1297 --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/ListIncrementalQueries.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +/** + * Verifies that Rya instance has Fluo application enabled and lists + * all SPARQL queries maintained by the applcation. + */ +public interface ListIncrementalQueries { + + /** + * Lists all SPARQL queries maintained by the Fluo Application for a given rya instance and associated information, + * including the Fluo Query Id, the QueryType, the ExportStrategy, and the pretty-printed SPARQL query. + * + * @param ryaInstance - Rya instance whose queries are incrementally maintained by Fluo + * @return String comprised of new line delimited Strings that provide information about each query registered in + * Fluo, including the query Id, the query type, the export strategies, and the SPARQL query + * @throws RyaClientException + */ + public String listIncrementalQueries(String ryaInstance) throws RyaClientException; + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java index d1481dc..c04bd86 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java @@ -34,6 +34,9 @@ public class RyaClient { private final Install install; private final CreatePCJ createPcj; private final DeletePCJ deletePcj; + private final CreatePeriodicPCJ createPeriodicPcj; + private final DeletePeriodicPCJ deletePeriodicPcj; + private final ListIncrementalQueries listIncrementalQueries; private final BatchUpdatePCJ bactchUpdatePCJ; private final GetInstanceDetails getInstanceDetails; private final InstanceExists instanceExists; @@ -51,6 +54,9 @@ public class RyaClient { final Install install, final CreatePCJ createPcj, final DeletePCJ deletePcj, + final CreatePeriodicPCJ createPeriodicPcj, + final DeletePeriodicPCJ deletePeriodicPcj, + final ListIncrementalQueries listIncrementalQueries, final BatchUpdatePCJ batchUpdatePcj, final GetInstanceDetails getInstanceDetails, final InstanceExists instanceExists, @@ -63,6 +69,9 @@ public class RyaClient { this.install = requireNonNull(install); this.createPcj = requireNonNull(createPcj); this.deletePcj = requireNonNull(deletePcj); + this.createPeriodicPcj = createPeriodicPcj; + this.deletePeriodicPcj = deletePeriodicPcj; + this.listIncrementalQueries = listIncrementalQueries; this.bactchUpdatePCJ = requireNonNull(batchUpdatePcj); this.getInstanceDetails = requireNonNull(getInstanceDetails); this.instanceExists = requireNonNull(instanceExists); @@ -96,8 +105,30 @@ public class RyaClient { public DeletePCJ getDeletePCJ() { return deletePcj; } + + /** + * @return An instance of {@link CreatePeridodicPCJ} that is connected to a Rya Periodic Storage + */ + public CreatePeriodicPCJ getCreatePeriodicPCJ() { + return createPeriodicPcj; + } /** + * @return An instance of {@link DeletePeriodicPCJ} that is connected to a Rya Periodic Storage + */ + public DeletePeriodicPCJ getDeletePeriodicPCJ() { + return deletePeriodicPcj; + } + + /** + * @return An instance of {@link ListIncrementalQueries} for displaying queries that are incrementallly + * maintained by the Rya instance + */ + public ListIncrementalQueries getListIncrementalQueries() { + return listIncrementalQueries; + } + + /** * @return An instance of {@link BatchUpdatePCJ} that is connect to a Rya storage * if the Rya instance supports PCJ indexing. */ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml index 7961b9f..16a205f 100644 --- a/extras/indexing/pom.xml +++ b/extras/indexing/pom.xml @@ -81,7 +81,10 @@ <groupId>org.apache.rya</groupId> <artifactId>rya.pcj.fluo.api</artifactId> </dependency> - + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service.api</artifactId> + </dependency> <!-- OpenRDF --> <dependency> <groupId>org.openrdf.sesame</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java new file mode 100644 index 0000000..26a25da --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePeriodicPCJ.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; + +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.api.client.CreatePeriodicPCJ; +import org.apache.rya.api.client.GetInstanceDetails; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery; +import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery.PeriodicQueryCreationException; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +/** + * Class used by the RyaClient for creating Periodic PCJ. + * + */ +public class AccumuloCreatePeriodicPCJ extends AccumuloCommand implements CreatePeriodicPCJ { + + private final GetInstanceDetails getInstanceDetails; + + /** + * Constructs an instance of {@link AccumuloCreatePeriodicPCJ}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloCreatePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector); + } + + @Override + public String createPeriodicPCJ(String instanceName, String sparql, String periodicTopic, String bootStrapServers) throws RyaClientException { + requireNonNull(instanceName); + requireNonNull(sparql); + + final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName); + final boolean ryaInstanceExists = ryaDetailsHolder.isPresent(); + if (!ryaInstanceExists) { + throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName)); + } + + final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails(); + final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled(); + if (!pcjIndexingEnabeld) { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); + } + + // If a Fluo application is being used, task it with updating the PCJ. + final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); + if (fluoDetailsHolder.isPresent()) { + final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); + try { + return updateFluoAppAndRegisterWithKafka(instanceName, fluoAppName, sparql, periodicTopic, bootStrapServers); + } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException + | RyaDAOException | PeriodicQueryCreationException e) { + throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); + } catch (UnsupportedQueryException e) { + throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node " + + "or an invalid ExportStrategy for the given QueryType. Projection queries can be exported to either Rya or Kafka," + + "unless they contain an aggregation, in which case they can only be exported to Kafka. Construct queries can be exported" + + "to Rya and Kafka, and Periodic queries can only be exported to Rya."); + } + } else { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); + } + } + + + + + private String updateFluoAppAndRegisterWithKafka(final String ryaInstance, final String fluoAppName, String sparql, String periodicTopic, String bootStrapServers) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException, PeriodicQueryCreationException { + requireNonNull(sparql); + requireNonNull(periodicTopic); + requireNonNull(bootStrapServers); + + final PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance); + + // Connect to the Fluo application that is updating this instance's PCJs. + final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); + try(final FluoClient fluoClient = new FluoClientFactory().connect( + cd.getUsername(), + new String(cd.getPassword()), + cd.getInstanceName(), + cd.getZookeepers(), + fluoAppName);) { + // Initialize the PCJ within the Fluo application. + final CreatePeriodicQuery periodicPcj = new CreatePeriodicQuery(fluoClient, periodicStorage); + PeriodicNotificationClient periodicClient = new KafkaNotificationRegistrationClient(periodicTopic, createProducer(bootStrapServers)); + return periodicPcj.withRyaIntegration(sparql, periodicClient, getConnector(), ryaInstance).getQueryId(); + } + } + + + private static KafkaProducer<String, CommandNotification> createProducer(String bootStrapServers) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName()); + return new KafkaProducer<>(props); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java new file mode 100644 index 0000000..18e49dc --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePeriodicPCJ.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Properties; + +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.rya.api.client.DeletePeriodicPCJ; +import org.apache.rya.api.client.GetInstanceDetails; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery; +import org.apache.rya.indexing.pcj.fluo.api.DeletePeriodicQuery.QueryDeletionException; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage; +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient; +import org.apache.rya.periodic.notification.notification.CommandNotification; +import org.apache.rya.periodic.notification.registration.KafkaNotificationRegistrationClient; +import org.apache.rya.periodic.notification.serialization.CommandNotificationSerializer; +import org.openrdf.query.MalformedQueryException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; + +/** + * Class used by the RyaClient and Rya Shell for deleting Periodic PCJ. + * + */ +public class AccumuloDeletePeriodicPCJ extends AccumuloCommand implements DeletePeriodicPCJ { + private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class); + + private final GetInstanceDetails getInstanceDetails; + + /** + * Constructs an instance of {@link AccumuloDeletePeriodicPCJ}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programmatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloDeletePeriodicPCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector); + } + + @Override + public void deletePeriodicPCJ(final String instanceName, final String pcjId, String topic, String brokers) throws InstanceDoesNotExistException, RyaClientException { + requireNonNull(instanceName); + requireNonNull(pcjId); + + final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName); + final boolean ryaInstanceExists = originalDetails.isPresent(); + if(!ryaInstanceExists) { + throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName)); + } + + final boolean pcjIndexingEnabled = originalDetails.get().getPCJIndexDetails().isEnabled(); + if(!pcjIndexingEnabled) { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); + } + + // If the PCJ was being maintained by a Fluo application, then stop that process. + final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails(); + final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); + + if (fluoDetailsHolder.isPresent()) { + final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName(); + try { + stopUpdatingPCJ(instanceName, fluoAppName, pcjId, topic, brokers); + } catch (MalformedQueryException | UnsupportedQueryException | QueryDeletionException e) { + throw new RyaClientException(String.format("Unable to delete Periodic Query with id: %s", pcjId), e); + } + } else { + log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are " + + "missing for the Rya instance named '%s'.", instanceName)); + } + + } + + + private void stopUpdatingPCJ(final String ryaInstance, final String fluoAppName, final String pcjId, final String topic, final String brokers) throws UnsupportedQueryException, MalformedQueryException, QueryDeletionException { + requireNonNull(fluoAppName); + requireNonNull(pcjId); + + // Connect to the Fluo application that is updating this instance's PCJs. + final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); + try (final FluoClient fluoClient = new FluoClientFactory().connect(cd.getUsername(), new String(cd.getPassword()), + cd.getInstanceName(), cd.getZookeepers(), fluoAppName)) { + // Delete the PCJ from the Fluo App. + PeriodicQueryResultStorage periodic = new AccumuloPeriodicQueryResultStorage(getConnector(), ryaInstance); + DeletePeriodicQuery deletePeriodic = new DeletePeriodicQuery(fluoClient, periodic); + deletePeriodic.deletePeriodicQuery(pcjId, getPeriodicNotificationClient(topic, brokers)); + } + } + + + private static PeriodicNotificationClient getPeriodicNotificationClient(String topic, String brokers) throws MalformedQueryException { + return new KafkaNotificationRegistrationClient(topic, createProducer(brokers)); + } + + private static KafkaProducer<String, CommandNotification> createProducer(String brokers) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CommandNotificationSerializer.class.getName()); + return new KafkaProducer<>(props); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java new file mode 100644 index 0000000..51e7d6a --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloListIncrementalQueries.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.List; + +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.rya.api.client.GetInstanceDetails; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.ListIncrementalQueries; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import org.apache.rya.indexing.pcj.fluo.api.ListFluoQueries; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +@DefaultAnnotation(NonNull.class) +public class AccumuloListIncrementalQueries extends AccumuloCommand implements ListIncrementalQueries { + + private final GetInstanceDetails getInstanceDetails; + + public AccumuloListIncrementalQueries(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector); + } + + @Override + public String listIncrementalQueries(String instanceName) throws RyaClientException { + + requireNonNull(instanceName); + + final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName); + final boolean ryaInstanceExists = ryaDetailsHolder.isPresent(); + if (!ryaInstanceExists) { + throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName)); + } + + final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails(); + final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled(); + if (!pcjIndexingEnabeld) { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); + } + + // If a Fluo application is being used, task it with updating the PCJ. + final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); + if (fluoDetailsHolder.isPresent()) { + final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); + try { + return getFluoQueryString(instanceName, fluoAppName); + } catch (Exception e) { + throw new RyaClientException("Problem while creating Fluo Query Strings.", e); + } + } else { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have Fluo incremental updating enabled.", instanceName)); + } + } + + + private String getFluoQueryString(final String ryaInstance, final String fluoAppName) throws Exception { + + // Connect to the Fluo application that is updating this instance's PCJs. + final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); + try(final FluoClient fluoClient = new FluoClientFactory().connect( + cd.getUsername(), + new String(cd.getPassword()), + cd.getInstanceName(), + cd.getZookeepers(), + fluoAppName);) { + // Initialize the PCJ within the Fluo application. + ListFluoQueries listQueries = new ListFluoQueries(); + List<String> queries = listQueries.listFluoQueries(fluoClient); + return Joiner.on("\n").join(queries); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java index 5ee02f9..d9bf644 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java @@ -52,6 +52,9 @@ public class AccumuloRyaClientFactory { new AccumuloInstall(connectionDetails, connector), new AccumuloCreatePCJ(connectionDetails, connector), new AccumuloDeletePCJ(connectionDetails, connector), + new AccumuloCreatePeriodicPCJ(connectionDetails, connector), + new AccumuloDeletePeriodicPCJ(connectionDetails, connector), + new AccumuloListIncrementalQueries(connectionDetails, connector), new AccumuloBatchUpdatePCJ(connectionDetails, connector), new AccumuloGetInstanceDetails(connectionDetails, connector), new AccumuloInstanceExists(connectionDetails, connector), http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml index 758c481..16d33b2 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml +++ b/extras/rya.pcj.fluo/pcj.fluo.api/pom.xml @@ -42,6 +42,10 @@ under the License. <artifactId>rya.pcj.fluo.app</artifactId> </dependency> <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.periodic.service.api</artifactId> + </dependency> + <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.sail</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java index 501f1f5..a988bc7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java @@ -38,6 +38,7 @@ import org.apache.log4j.Logger; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaType; import org.apache.rya.api.domain.RyaURI; @@ -135,6 +136,7 @@ public class CreateFluoPcj { * according to the Kafka {@link ExportStrategy}. * * @param sparql - sparql query String to be registered with Fluo + * @param strategies - ExportStrategies used to specify how final results will be handled * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) * @return The metadata that was written to the Fluo application for the PCJ. * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. @@ -218,7 +220,17 @@ public class CreateFluoPcj { .setSparql(sparql) .setJoinBatchSize(joinBatchSize); - return builder.build(); + FluoQuery query = builder.build(); + + if(query.getQueryType() == QueryType.PERIODIC && !Sets.newHashSet(ExportStrategy.PERIODIC).containsAll(strategies)) { + throw new UnsupportedQueryException("Periodic Queries must only utilize the PeriodicExport or the NoOpExport ExportStrategy."); + } + + if(query.getQueryType() != QueryType.PERIODIC && strategies.contains(ExportStrategy.PERIODIC)) { + throw new UnsupportedQueryException("Only Periodic Queries can utilize the PeriodicExport ExportStrategy."); + } + + return query; } private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) { @@ -283,13 +295,13 @@ public class CreateFluoPcj { * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) * @param accumulo - Accumulo connector for connecting with Accumulo * @param ryaInstance - name of Rya instance to connect to - * @return The Fluo application's Query ID of the query that was created. + * @return FluoQuery containing the metadata for the newly registered SPARQL query * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. * @throws UnsupportedQueryException */ - public String withRyaIntegration( + public FluoQuery withRyaIntegration( final String pcjId, final String sparql, final Set<ExportStrategy> strategies, @@ -308,7 +320,7 @@ public class CreateFluoPcj { //import results already ingested into Rya that match query importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance); // return queryId to the caller for later monitoring from the export. - return fluoQuery.getQueryMetadata().getNodeId(); + return fluoQuery; } /** @@ -326,13 +338,13 @@ public class CreateFluoPcj { * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) * @param accumulo - Accumuo connector for connecting to Accumulo * @param ryaInstance - name of Rya instance to connect to - * @return The Fluo application's Query ID of the query that was created. + * @return FluoQuery containing the metadata for the newly registered SPARQL query * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. * @throws UnsupportedQueryException */ - public String withRyaIntegration( + public FluoQuery withRyaIntegration( final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java new file mode 100644 index 0000000..24adde9 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePeriodicQuery.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import java.util.Optional; + +import org.apache.accumulo.core.client.Connector; +import org.apache.fluo.api.client.FluoClient; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryNode; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient; +import org.apache.rya.periodic.notification.notification.PeriodicNotification; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.evaluation.function.Function; + +import com.google.common.collect.Sets; + + +/** + * Object that creates a Periodic Query. A Periodic Query is any query + * requesting periodic updates about events that occurred within a given + * window of time of this instant. This is also known as a rolling window + * query. Period Queries can be expressed using SPARQL by including the + * {@link Function} indicated by the URI {@link PeriodicQueryUtil#PeriodicQueryURI} + * in the query. The user must provide this Function with the following arguments: + * the temporal variable in the query that will be filtered on, the window of time + * that events must occur within, the period at which the user wants to receive updates, + * and the time unit. The following query requests all observations that occurred + * within the last minute and requests updates every 15 seconds. It also performs + * a count on those observations. + * <p> + * <pre> + * prefix function: http://org.apache.rya/function# + * "prefix time: http://www.w3.org/2006/time# + * "select (count(?obs) as ?total) where { + * "Filter(function:periodic(?time, 1, .25, time:minutes)) + * "?obs uri:hasTime ?time. + * "?obs uri:hasId ?id } + * </pre> + * <p> + * This class is responsible for taking a Periodic Query expressed as a SPARQL query + * and adding to Fluo and Kafka so that it can be processed by the {@link PeriodicNotificationApplication}. + */ +public class CreatePeriodicQuery { + + private FluoClient fluoClient; + private PeriodicQueryResultStorage periodicStorage; + + + /** + * Constructs an instance of CreatePeriodicQuery for creating periodic queries. An instance + * of CreatePeriodicQuery that is created using this constructor will not publish new PeriodicNotifications + * to Kafka. + * + * @param fluoClient - Fluo client for interacting with Fluo + * @param periodicStorage - PeriodicQueryResultStorage storing periodic query results + */ + public CreatePeriodicQuery(FluoClient fluoClient, PeriodicQueryResultStorage periodicStorage) { + this.fluoClient = fluoClient; + this.periodicStorage = periodicStorage; + } + + + /** + * Creates a Periodic Query by adding the query to Fluo and using the resulting + * Fluo id to create a {@link PeriodicQueryResultStorage} table. + * + * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table + * @return FluoQuery indicating the metadata of the registered SPARQL query + */ + public FluoQuery createPeriodicQuery(String sparql) throws PeriodicQueryCreationException { + try { + Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql); + if(optNode.isPresent()) { + String pcjId = FluoQueryUtils.createNewPcjId(); + + //register query with Fluo + CreateFluoPcj createPcj = new CreateFluoPcj(); + FluoQuery fluoQuery = createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluoClient); + + //register query with PeriodicResultStorage table + periodicStorage.createPeriodicQuery(pcjId, sparql); + + return fluoQuery; + } else { + throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter."); + } + } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) { + throw new PeriodicQueryCreationException(e); + } + } + + + /** + * Creates a Periodic Query by adding the query to Fluo and using the resulting + * Fluo id to create a {@link PeriodicQueryResultStorage} table. Additionally, + * the associated PeriodicNotification is registered with the Periodic Query Service. + * + * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table + * @param notificationClient - {@link PeriodicNotificationClient} for registering new PeriodicNotifications + * @return FluoQuery indicating the metadata of the registered SPARQL query + */ + public FluoQuery createPeriodicQuery(String sparql, PeriodicNotificationClient notificationClient) throws PeriodicQueryCreationException { + try { + Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql); + if(optNode.isPresent()) { + PeriodicQueryNode periodicNode = optNode.get(); + String pcjId = FluoQueryUtils.createNewPcjId(); + + //register query with Fluo + CreateFluoPcj createPcj = new CreateFluoPcj(); + FluoQuery fluoQuery = createPcj.createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), fluoClient); + + //register query with PeriodicResultStorage table + periodicStorage.createPeriodicQuery(pcjId, sparql); + //create notification + PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod()) + .timeUnit(periodicNode.getUnit()).build(); + //register notification with periodic notification app + notificationClient.addNotification(notification); + + return fluoQuery; + } else { + throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter."); + } + } catch (MalformedQueryException | PeriodicQueryStorageException | UnsupportedQueryException e) { + throw new PeriodicQueryCreationException(e); + } + } + + + /** + * Creates a Periodic Query by adding the query to Fluo and using the resulting + * Fluo id to create a {@link PeriodicQueryResultStorage} table. + * @param sparql - sparql query registered to Fluo whose results are stored in PeriodicQueryResultStorage table + * @param notificationClient - {@link PeriodicNotificationClient} for registering new PeriodicNotifications + * @param conn - Accumulo connector for connecting to the Rya instance + * @param ryaInstance - name of the Accumulo back Rya instance + * @return FluoQuery indicating the metadata of the registered SPARQL query + */ + public FluoQuery withRyaIntegration(String sparql, PeriodicNotificationClient notificationClient, Connector conn, String ryaInstance) + throws PeriodicQueryCreationException { + try { + Optional<PeriodicQueryNode> optNode = PeriodicQueryUtil.getPeriodicNode(sparql); + if (optNode.isPresent()) { + PeriodicQueryNode periodicNode = optNode.get(); + String pcjId = FluoQueryUtils.createNewPcjId(); + + // register query with Fluo + CreateFluoPcj createPcj = new CreateFluoPcj(); + FluoQuery fluoQuery = createPcj.withRyaIntegration(pcjId, sparql, Sets.newHashSet(ExportStrategy.PERIODIC), + fluoClient, conn, ryaInstance); + + // register query with PeriodicResultStorage table + periodicStorage.createPeriodicQuery(pcjId, sparql); + // create notification + PeriodicNotification notification = PeriodicNotification.builder().id(pcjId).period(periodicNode.getPeriod()) + .timeUnit(periodicNode.getUnit()).build(); + // register notification with periodic notification app + notificationClient.addNotification(notification); + + return fluoQuery; + } else { + throw new RuntimeException("Invalid PeriodicQuery. Query must possess a PeriodicQuery Filter."); + } + } catch (Exception e) { + throw new PeriodicQueryCreationException(e); + } + } + + /** + * This Exception gets thrown whenever there is an issue creating a PeriodicQuery. + * + */ + public static class PeriodicQueryCreationException extends Exception { + + private static final long serialVersionUID = 1L; + + public PeriodicQueryCreationException(Exception e) { + super(e); + } + + public PeriodicQueryCreationException(String message, Exception e) { + super(message, e); + } + + public PeriodicQueryCreationException(String message) { + super(message); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java new file mode 100644 index 0000000..4ff88da --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePeriodicQuery.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage; +import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException; +import org.apache.rya.periodic.notification.api.PeriodicNotificationClient; +import org.apache.rya.periodic.notification.notification.BasicNotification; + +import com.google.common.base.Preconditions; + +public class DeletePeriodicQuery { + + private FluoClient fluo; + private PeriodicQueryResultStorage periodicStorage; + + public DeletePeriodicQuery(FluoClient fluo, PeriodicQueryResultStorage periodicStorage) { + this.fluo = fluo; + this.periodicStorage = periodicStorage; + } + + /** + * Deletes the Periodic Query with the indicated pcjId from Fluo and {@link PeriodicQueryResultStorage}. + * @param pcjId - Id of the Periodic Query to be deleted + */ + public void deletePeriodicQuery(String pcjId) throws QueryDeletionException { + + Preconditions.checkNotNull(pcjId); + + DeleteFluoPcj deletePcj = new DeleteFluoPcj(1000); + try { + deletePcj.deletePcj(fluo, pcjId); + periodicStorage.deletePeriodicQuery(pcjId); + } catch (UnsupportedQueryException | PeriodicQueryStorageException e) { + throw new QueryDeletionException(String.format("Unable to delete the Periodic Query with Id: %s", pcjId), e); + } + + } + + /** + * Deletes the Periodic Query with the indicated pcjId from Fluo and {@link PeriodicQueryResultStorage}. In + * addition, this method also informs the Periodic Notification Service to stop generating PeriodicNotifications + * associated with the Periodic Query. + * + * @param queryId - Id of the Periodic Query to be deleted + * @param periodicClient - Client used to inform the Periodic Notification Service to stop generating notifications + * @throws QueryDeletionException + */ + public void deletePeriodicQuery(String pcjId, PeriodicNotificationClient periodicClient) throws QueryDeletionException { + + Preconditions.checkNotNull(periodicClient); + + deletePeriodicQuery(pcjId); + periodicClient.deleteNotification(new BasicNotification(pcjId)); + } + + /** + * This Exception is thrown when a problem is encountered while deleting a + * query from the Fluo Application or the underlying storage layer. + */ + public static class QueryDeletionException extends Exception { + + private static final long serialVersionUID = 1L; + + public QueryDeletionException(String message) { + super(message); + } + + public QueryDeletionException(String message, Exception e) { + super(message, e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java new file mode 100644 index 0000000..8f5bbfe --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListFluoQueries.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.queryrender.sparql.SPARQLQueryRenderer; + +import com.google.common.base.Preconditions; + +/** + * Class for retrieving a List containing a String representation of each query maintained by Fluo. + * + */ +public class ListFluoQueries { + + private static final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + + /** + * Retrieve a list of String representations of each query maintained by Fluo + * + * @param fluo - FluoClient for interacting with Fluo + * @return - List of String representations of queries maintained by Fluo. + * @throws Exception + */ + public List<String> listFluoQueries(FluoClient fluo) throws Exception { + + List<String> queryStrings = new ArrayList<>(); + Snapshot sx = fluo.newSnapshot(); + + List<String> ids = new ListQueryIds().listQueryIds(fluo); + for (String id : ids) { + queryStrings.add(extractString(dao.readQueryMetadata(sx, id))); + } + + return queryStrings; + } + + private static String extractString(QueryMetadata metadata) throws Exception { + FluoQueryStringBuilder builder = new FluoQueryStringBuilder(); + return builder.setQueryId(metadata.getNodeId()).setQueryType(metadata.getQueryType()) + .setExportStrategies(metadata.getExportStrategies()).setQuery(metadata.getSparql()).build(); + } + + private static String getPrettyPrintSparql(String sparql, int indent) throws Exception { + SPARQLParser parser = new SPARQLParser(); + ParsedQuery pq = parser.parseQuery(sparql, null); + SPARQLQueryRenderer render = new SPARQLQueryRenderer(); + String renderedQuery = render.render(pq); + + //remove extra quotes generated by query renderer + String[] splitRender = renderedQuery.split("\"\"\""); + StringBuilder builder = new StringBuilder(); + for(String s: splitRender) { + builder.append(s).append("\""); + } + builder.replace(builder.length() - 1, builder.length(), ""); + + //add indent to all lines following newline char + String[] newLineRender = builder.toString().split("\n"); + builder = new StringBuilder(); + String prefix = getVariableIndent(indent); + for(int i = 0; i < newLineRender.length; i++) { + if(i != 0) { + builder.append(prefix); + } + builder.append(newLineRender[i]).append("\n"); + } + + return builder.toString(); + } + + private static String getVariableIndent(int len) { + return new String(new char[len]).replace('\0', ' '); + } + + public static class FluoQueryStringBuilder { + + private String queryId; + private String sparql; + private QueryType queryType; + private Set<ExportStrategy> strategies; + + public FluoQueryStringBuilder setQueryId(String queryId) { + this.queryId = Preconditions.checkNotNull(queryId); + return this; + } + + public FluoQueryStringBuilder setQuery(String query) { + this.sparql = Preconditions.checkNotNull(query); + return this; + } + + public FluoQueryStringBuilder setExportStrategies(Set<ExportStrategy> strategies) { + this.strategies = Preconditions.checkNotNull(strategies); + return this; + } + + public FluoQueryStringBuilder setQueryType(QueryType queryType) { + this.queryType = Preconditions.checkNotNull(queryType); + return this; + } + + public String build() throws Exception { + + int valueAlign = 20; + String sparqlHeader = "SPARQL: "; + String idHeader = "QUERY ID: "; + String typeHeader = "QUERY TYPE: "; + String strategiesHeader = "EXPORT STRATEGIES: "; + + StringBuilder builder = new StringBuilder(); + builder.append(idHeader).append(getVariableIndent(valueAlign - idHeader.length())).append(queryId).append("\n") + .append(typeHeader).append(getVariableIndent(valueAlign - typeHeader.length())).append(queryType).append("\n") + .append(strategiesHeader).append(getVariableIndent(valueAlign - strategiesHeader.length())).append(strategies).append("\n") + .append(sparqlHeader).append(getVariableIndent(valueAlign - sparqlHeader.length())).append(getPrettyPrintSparql(sparql, valueAlign)).append("\n"); + + return builder.toString(); + } + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java index c090d37..5405837 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java @@ -39,6 +39,7 @@ public class IncrementalUpdateConstants { public static final String CONSTRUCT_PREFIX = "CONSTRUCT"; public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY"; + //binding name reserved for periodic bin id for periodic query results public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId; public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI"; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java index 62f1271..2cb7eff 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java @@ -30,7 +30,6 @@ import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaSubGraph; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; @@ -95,16 +94,21 @@ public class ExporterManager implements AutoCloseable { * @throws ResultExportException */ private void exportBindingSet(Map<ExportStrategy, IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws ResultExportException { + VisibilityBindingSet bs; try { - VisibilityBindingSet bs = BS_SERDE.deserialize(data); + bs = BS_SERDE.deserialize(data); simplifyVisibilities(bs); + } catch (Exception e) { + throw new ResultExportException("Unable to deserialize the given BindingSet.", e); + } + try{ for(ExportStrategy strategy: strategies) { IncrementalBindingSetExporter exporter = (IncrementalBindingSetExporter) exporters.get(strategy); exporter.export(pcjId, bs); } } catch (Exception e) { - throw new ResultExportException("Unable to deserialize the provided BindingSet", e); + throw new ResultExportException("Unable to export the given BindingSet " + bs + " with the given set of ExportStrategies " + strategies, e); } } @@ -125,9 +129,14 @@ public class ExporterManager implements AutoCloseable { throw new ResultExportException("Undable to deserialize provided RyaSubgraph", e); } - for(ExportStrategy strategy: strategies) { - IncrementalRyaSubGraphExporter exporter = (IncrementalRyaSubGraphExporter) exporters.get(strategy); - exporter.export(pcjId, subGraph); + try { + for (ExportStrategy strategy : strategies) { + IncrementalRyaSubGraphExporter exporter = (IncrementalRyaSubGraphExporter) exporters.get(strategy); + exporter.export(pcjId, subGraph); + } + } catch (Exception e) { + throw new ResultExportException( + "Unable to export the given subgraph " + subGraph + " using all of the ExportStrategies " + strategies); } } @@ -195,8 +204,6 @@ public class ExporterManager implements AutoCloseable { * @return - ExporterManager for managing IncrementalResultExporters and exporting results */ public ExporterManager build() { - //adds NoOpExporter in the event that users does not want to Export results - addIncrementalResultExporter(new NoOpExporter()); return new ExporterManager(exporters); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java deleted file mode 100644 index ab7f2ed..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export; - -import java.util.Set; - -import org.apache.rya.api.client.CreatePCJ.ExportStrategy; -import org.apache.rya.api.client.CreatePCJ.QueryType; -import org.apache.rya.api.domain.RyaSubGraph; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; - -import com.google.common.collect.Sets; - -/** - * This class is a NoOpExporter that can be specified if a user does not - * want their results exported from Fluo. - * - */ -public class NoOpExporter implements IncrementalBindingSetExporter, IncrementalRyaSubGraphExporter { - - @Override - public Set<QueryType> getQueryTypes() { - return Sets.newHashSet(QueryType.CONSTRUCT, QueryType.PROJECTION); - } - - @Override - public ExportStrategy getExportStrategy() { - return ExportStrategy.NO_OP_EXPORT; - } - - @Override - public void close() throws Exception { - } - - @Override - public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException { - } - - @Override - public void export(String queryId, VisibilityBindingSet result) throws ResultExportException { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java index 4550a50..3687c9f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporterParameters.java @@ -26,7 +26,6 @@ import org.apache.kafka.common.serialization.StringSerializer; import com.google.common.base.Preconditions; - public class KafkaBindingSetExporterParameters extends KafkaExportParameterBase { public static final String CONF_USE_KAFKA_BINDING_SET_EXPORTER = "pcj.fluo.export.kafka.bindingset.enabled"; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java index 604462b..5a8f01c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/rya/PeriodicBindingSetExporter.java @@ -52,7 +52,7 @@ public class PeriodicBindingSetExporter implements IncrementalBindingSetExporter @Override public ExportStrategy getExportStrategy() { - return ExportStrategy.RYA; + return ExportStrategy.PERIODIC; } @Override http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java index ba7beee..e07c514 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/observers/QueryResultObserver.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.fluo.app.observers; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import static org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns.QUERY_BINDING_SET; import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; @@ -35,7 +36,6 @@ import org.apache.rya.indexing.pcj.fluo.app.export.kafka.KafkaRyaSubGraphExporte import org.apache.rya.indexing.pcj.fluo.app.export.rya.PeriodicBindingSetExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaBindingSetExporterFactory; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaSubGraphExporterFactory; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; @@ -66,7 +66,7 @@ public class QueryResultObserver extends AbstractObserver { @Override public ObservedColumn getObservedColumn() { - return new ObservedColumn(FluoQueryColumns.QUERY_BINDING_SET, NotificationType.STRONG); + return new ObservedColumn(QUERY_BINDING_SET, NotificationType.STRONG); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java index 17ab14f..a1c7c00 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQuery.java @@ -92,11 +92,7 @@ public class FluoQuery { this.statementPatternMetadata = requireNonNull(statementPatternMetadata); this.filterMetadata = requireNonNull(filterMetadata); this.joinMetadata = requireNonNull(joinMetadata); - if(constructMetadata.isPresent()) { - this.type = QueryType.CONSTRUCT; - } else { - this.type = QueryType.PROJECTION; - } + this.type = queryMetadata.getQueryType(); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java index 8569a48..6ca0e8d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryColumns.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.List; import org.apache.fluo.api.data.Column; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.indexing.pcj.fluo.app.AggregationResultUpdater.AggregationState; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/63f87b86/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java index d5d9fe7..1cf2825 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/query/FluoQueryMetadataDAO.java @@ -113,8 +113,10 @@ public class FluoQueryMetadataDAO { final String[] exportStrategies = values.get(FluoQueryColumns.QUERY_EXPORT_STRATEGIES).split(IncrementalUpdateConstants.VAR_DELIM); Set<ExportStrategy> strategies = new HashSet<>(); - for(String strategy: exportStrategies) { - strategies.add(ExportStrategy.valueOf(strategy)); + for (String strategy : exportStrategies) { + if (!strategy.isEmpty()) { + strategies.add(ExportStrategy.valueOf(strategy)); + } } return QueryMetadata.builder(nodeId)
