RYA-246-Query-Export-Strategy. Closes #213.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/05147266 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/05147266 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/05147266 Branch: refs/heads/master Commit: 0514726604757c03e66015edee742b0fbdcf1ca2 Parents: 82df3ad Author: Caleb Meier <[email protected]> Authored: Mon Aug 7 21:22:00 2017 -0700 Committer: Caleb Meier <[email protected]> Committed: Fri Aug 25 12:34:42 2017 -0700 ---------------------------------------------------------------------- .../org/apache/rya/api/client/CreatePCJ.java | 37 ++++ .../api/client/accumulo/AccumuloCreatePCJ.java | 27 ++- .../api/client/accumulo/AccumuloDeletePCJ.java | 6 +- .../rya/api/client/accumulo/FluoITBase.java | 2 +- .../src/main/java/RyaClientExample.java | 2 +- .../storage/accumulo/ShiftVarOrderFactory.java | 1 + .../indexing/pcj/fluo/api/CreateFluoPcj.java | 146 +++++++++---- .../indexing/pcj/fluo/api/DeleteFluoPcj.java | 127 +---------- .../indexing/pcj/fluo/api/GetPcjMetadata.java | 10 +- .../indexing/pcj/fluo/api/GetQueryReport.java | 30 +-- .../rya/indexing/pcj/fluo/api/ListQueryIds.java | 2 +- .../fluo/app/IncrementalUpdateConstants.java | 3 - .../pcj/fluo/app/export/ExporterManager.java | 216 +++++++++++++++++++ .../export/IncrementalBindingSetExporter.java | 8 +- .../IncrementalBindingSetExporterFactory.java | 104 --------- .../app/export/IncrementalResultExporter.java | 42 ++++ .../IncrementalResultExporterFactory.java | 104 +++++++++ .../export/IncrementalRyaSubGraphExporter.java | 2 +- .../IncrementalRyaSubGraphExporterFactory.java | 47 ---- .../pcj/fluo/app/export/NoOpExporter.java | 59 +++++ .../export/kafka/KafkaBindingSetExporter.java | 25 ++- .../kafka/KafkaBindingSetExporterFactory.java | 13 +- .../KafkaBindingSetExporterParameters.java | 80 +++++++ .../export/kafka/KafkaExportParameterBase.java | 86 ++++++++ .../app/export/kafka/KafkaExportParameters.java | 86 -------- .../export/kafka/KafkaRyaSubGraphExporter.java | 16 ++ .../kafka/KafkaRyaSubGraphExporterFactory.java | 17 +- .../kafka/KafkaSubGraphExporterParameters.java | 81 +++++++ .../export/rya/PeriodicBindingSetExporter.java | 71 ++++++ .../rya/PeriodicBindingSetExporterFactory.java | 74 +++++++ .../app/export/rya/RyaBindingSetExporter.java | 43 ++-- .../rya/RyaBindingSetExporterFactory.java | 15 +- .../app/export/rya/RyaExportParameters.java | 33 ++- .../export/rya/RyaSubGraphExportParameters.java | 120 +++++++++++ .../app/export/rya/RyaSubGraphExporter.java | 106 +++++++++ .../export/rya/RyaSubGraphExporterFactory.java | 58 +++++ .../fluo/app/observers/AggregationObserver.java | 3 - .../fluo/app/observers/BindingSetUpdater.java | 2 +- .../observers/ConstructQueryResultObserver.java | 167 ++------------ .../pcj/fluo/app/observers/FilterObserver.java | 3 - .../pcj/fluo/app/observers/JoinObserver.java | 3 - .../app/observers/PeriodicQueryObserver.java | 2 - .../fluo/app/observers/ProjectionObserver.java | 2 - .../fluo/app/observers/QueryResultObserver.java | 94 ++++---- .../app/observers/StatementPatternObserver.java | 4 - .../indexing/pcj/fluo/app/query/FluoQuery.java | 20 +- .../pcj/fluo/app/query/FluoQueryColumns.java | 26 --- .../fluo/app/query/FluoQueryMetadataDAO.java | 19 +- .../pcj/fluo/app/query/QueryMetadata.java | 5 +- .../fluo/app/query/SparqlFluoQueryBuilder.java | 24 ++- .../app/query/UnsupportedQueryException.java | 41 ++++ .../pcj/fluo/app/util/FluoQueryUtils.java | 8 + .../export/rya/KafkaExportParametersTest.java | 25 +-- .../app/export/rya/RyaExportParametersTest.java | 6 +- .../fluo/app/query/PeriodicQueryUtilTest.java | 2 +- .../app/query/QueryMetadataVisitorTest.java | 2 +- .../pcj/fluo/client/PcjAdminClient.java | 4 + .../pcj/fluo/client/PcjAdminClientCommand.java | 4 +- .../fluo/client/command/NewQueryCommand.java | 3 +- .../fluo/client/command/QueryReportCommand.java | 3 +- .../fluo/client/util/QueryReportRenderer.java | 10 +- .../rya/indexing/pcj/fluo/demo/DemoDriver.java | 2 +- .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 7 +- .../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 5 +- .../indexing/pcj/fluo/api/ListQueryIdsIT.java | 10 +- .../fluo/app/query/FluoQueryMetadataDAOIT.java | 24 +-- .../indexing/pcj/fluo/integration/BatchIT.java | 3 +- .../pcj/fluo/integration/CreateDeleteIT.java | 7 +- .../pcj/fluo/integration/KafkaExportIT.java | 20 +- .../integration/KafkaRyaSubGraphExportIT.java | 22 +- .../indexing/pcj/fluo/integration/QueryIT.java | 6 +- .../pcj/fluo/test/base/KafkaExportITBase.java | 59 ++--- .../rya/pcj/fluo/test/base/RyaExportITBase.java | 3 +- .../PeriodicNotificationProviderIT.java | 3 +- .../pruner/PeriodicNotificationBinPrunerIT.java | 6 +- .../notification/api/CreatePeriodicQuery.java | 19 +- .../pruner/PeriodicQueryPruner.java | 9 +- .../recovery/PeriodicNotificationProvider.java | 3 +- .../org/apache/rya/shell/RyaAdminCommands.java | 21 +- .../apache/rya/shell/RyaAdminCommandsTest.java | 12 +- 80 files changed, 1698 insertions(+), 924 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java index e03a1f1..6e92b28 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/CreatePCJ.java @@ -18,6 +18,8 @@ */ package org.apache.rya.api.client; +import java.util.Set; + import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -28,7 +30,41 @@ import edu.umd.cs.findbugs.annotations.NonNull; public interface CreatePCJ { /** + * Metadata enum used to indicate the type of query that is registered. If + * the topmost node is a Construct QueryNode, then the type is Construct. If the + * topmost node is a Projection QueryNode, then the type is Projection. If the + * query contains a PeriodicQuery Filter anywhere within the query, then it is of type + * Periodic. + * + */ + public static enum QueryType{CONSTRUCT, PROJECTION, PERIODIC}; + + /** + * Specifies the how Results will be exported from the Rya Fluo + * Application. + * + */ + public static enum ExportStrategy{RYA, KAFKA, NO_OP_EXPORT}; + + + /** + * Designate a new PCJ that will be maintained by the target instance of Rya. + * Results will be exported according to the specified export strategies. + * + * @param instanceName - Indicates which Rya instance will create and maintain + * the PCJ. (not null) + * @param sparql - The SPARQL query that will be maintained. (not null) + * @param strategies - The export strategies used to export results for this query + * @return The ID that was assigned to this newly created PCJ. + * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name. + * @throws RyaClientException Something caused the command to fail. + */ + public String createPCJ(final String instanceName, String sparql, Set<ExportStrategy> strategies) throws InstanceDoesNotExistException, RyaClientException; + + + /** * Designate a new PCJ that will be maintained by the target instance of Rya. + * Results will be exported to a Rya PCJ table. * * @param instanceName - Indicates which Rya instance will create and maintain * the PCJ. (not null) @@ -38,4 +74,5 @@ public interface CreatePCJ { * @throws RyaClientException Something caused the command to fail. */ public String createPCJ(final String instanceName, String sparql) throws InstanceDoesNotExistException, RyaClientException; + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java index 644189a..6aef33c 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java @@ -20,6 +20,8 @@ package org.apache.rya.api.client.accumulo; import static java.util.Objects.requireNonNull; +import java.util.Set; + import org.apache.accumulo.core.client.Connector; import org.apache.fluo.api.client.FluoClient; import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; @@ -39,6 +41,7 @@ import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; @@ -49,6 +52,7 @@ import org.openrdf.repository.RepositoryException; import org.openrdf.sail.SailException; import com.google.common.base.Optional; +import com.google.common.collect.Sets; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -73,7 +77,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { } @Override - public String createPCJ(final String instanceName, final String sparql) throws InstanceDoesNotExistException, RyaClientException { + public String createPCJ(final String instanceName, final String sparql, Set<ExportStrategy> strategies) throws InstanceDoesNotExistException, RyaClientException { requireNonNull(instanceName); requireNonNull(sparql); @@ -99,9 +103,14 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { if(fluoDetailsHolder.isPresent()) { final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); try { - updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId); + updateFluoApp(instanceName, fluoAppName, pcjId, sparql, strategies); } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) { throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); + } catch (UnsupportedQueryException e) { + throw new RyaClientException("The new PCJ could not be initialized because it either contains an unsupported query node " + + "or an invalid ExportStrategy for the given QueryType. Projection queries can be exported to either Rya or Kafka," + + "unless they contain an aggregation, in which case they can only be exported to Kafka. Construct queries can be exported" + + "to Rya and Kafka, and Periodic queries can only be exported to Rya."); } // Update the Rya Details to indicate the PCJ is being updated incrementally. @@ -133,9 +142,16 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { } } - private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException { - requireNonNull(pcjStorage); + @Override + public String createPCJ(String instanceName, String sparql) throws InstanceDoesNotExistException, RyaClientException { + return createPCJ(instanceName, sparql, Sets.newHashSet(ExportStrategy.RYA)); + } + + + private void updateFluoApp(final String ryaInstance, final String fluoAppName, final String pcjId, String sparql, Set<ExportStrategy> strategies) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException, UnsupportedQueryException { + requireNonNull(sparql); requireNonNull(pcjId); + requireNonNull(strategies); // Connect to the Fluo application that is updating this instance's PCJs. final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); @@ -147,7 +163,8 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { fluoAppName);) { // Initialize the PCJ within the Fluo application. final CreateFluoPcj fluoCreatePcj = new CreateFluoPcj(); - fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance); + fluoCreatePcj.withRyaIntegration(pcjId, sparql, strategies, fluoClient, getConnector(), ryaInstance); } } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java index eb2b2d7..547254d 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java @@ -123,7 +123,11 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ { cd.getZookeepers(), fluoAppName)) { // Delete the PCJ from the Fluo App. - new DeleteFluoPcj(1000).deletePcj(fluoClient, pcjId); + try { + new DeleteFluoPcj(1000).deletePcj(fluoClient, pcjId); + } catch (Exception e) { + log.warn("PcjId corresponds to an invalid PCJ. The query cannot be deleted."); + } } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java index 113b397..695704b 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java @@ -199,7 +199,7 @@ public abstract class FluoITBase { final HashMap<String, String> params = new HashMap<>(); final RyaExportParameters ryaParams = new RyaExportParameters(params); - ryaParams.setExportToRya(true); + ryaParams.setUseRyaBindingSetExporter(true); ryaParams.setAccumuloInstanceName(instanceName); ryaParams.setZookeeperServers(zookeepers); ryaParams.setExporterUsername(clusterInstance.getUsername()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/indexingExample/src/main/java/RyaClientExample.java ---------------------------------------------------------------------- diff --git a/extras/indexingExample/src/main/java/RyaClientExample.java b/extras/indexingExample/src/main/java/RyaClientExample.java index 1b0450f..b0afd5a 100644 --- a/extras/indexingExample/src/main/java/RyaClientExample.java +++ b/extras/indexingExample/src/main/java/RyaClientExample.java @@ -249,7 +249,7 @@ public class RyaClientExample { // export observer. final HashMap<String, String> params = new HashMap<>(); final RyaExportParameters ryaParams = new RyaExportParameters(params); - ryaParams.setExportToRya(true); + ryaParams.setUseRyaBindingSetExporter(true); ryaParams.setAccumuloInstanceName(instanceName); ryaParams.setZookeeperServers(zookeepers); ryaParams.setExporterUsername(username); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java index 26c4339..e297ec9 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java @@ -46,6 +46,7 @@ public class ShiftVarOrderFactory implements PcjVarOrderFactory { final Set<String> bindingNames = new SPARQLParser().parseQuery(sparql, null) .getTupleExpr() .getBindingNames(); + return makeVarOrders( new VariableOrder(bindingNames) ); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java index 150a256..501f1f5 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java @@ -27,7 +27,6 @@ import java.io.UnsupportedEncodingException; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.UUID; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -38,6 +37,7 @@ import org.apache.fluo.api.client.Transaction; import org.apache.log4j.Logger; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaType; import org.apache.rya.api.domain.RyaURI; @@ -50,6 +50,8 @@ import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; @@ -61,6 +63,7 @@ import org.openrdf.query.MalformedQueryException; import org.openrdf.query.algebra.StatementPattern; import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; @@ -128,60 +131,92 @@ public class CreateFluoPcj { /** * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated - * inside of Fluo. This method assumes that the user will export the results to Kafka or - * some other external resource. The export id is equivalent to the queryId that is returned, - * which is in contrast to the other createPcj methods in this class which accept an external pcjId - * that is used to identify the Accumulo table or Kafka topic for exporting results. + * inside of Fluo. This method assumes that the user will export the results to Kafka + * according to the Kafka {@link ExportStrategy}. * * @param sparql - sparql query String to be registered with Fluo * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) * @return The metadata that was written to the Fluo application for the PCJ. * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws UnsupportedQueryException * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. */ - public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException { + public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException { Preconditions.checkNotNull(sparql); Preconditions.checkNotNull(fluo); - String pcjId = UUID.randomUUID().toString().replaceAll("-", ""); - return createPcj(pcjId, sparql, fluo); + String pcjId = FluoQueryUtils.createNewPcjId(); + return createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.KAFKA), fluo); } /** * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method provides * no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely - * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated - * inside of Fluo. This method assumes that the user will export the results to Kafka or - * some other external resource. + * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated + * inside of Fluo. Results are exported according to the Set of {@link ExportStrategy} enums. If + * the Rya ExportStrategy is specified, care should be taken to verify that the PCJ table exists. * * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) * @param sparql - sparql query String to be registered with Fluo + * @param strategies - ExportStrategies used to specify how final results will be handled * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) * @return The metadata that was written to the Fluo application for the PCJ. - * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + * @throws UnsupportedQueryException + * @throws MalformedQueryException */ public FluoQuery createPcj( final String pcjId, final String sparql, - final FluoClient fluo) throws MalformedQueryException { + final Set<ExportStrategy> strategies, + final FluoClient fluo) throws MalformedQueryException, UnsupportedQueryException { requireNonNull(pcjId); requireNonNull(sparql); + requireNonNull(strategies); requireNonNull(fluo); - FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId); + FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId, strategies); writeFluoQuery(fluo, fluoQuery, pcjId); return fluoQuery; } - private FluoQuery makeFluoQuery(String sparql, String pcjId) throws MalformedQueryException { + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an + * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists. + * Results are exported to a PCJ table with the provided pcjId according to the Rya + * {@link ExportStrategy}. + * + * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) + * @param pcjStorage - Provides access to the PCJ index. (not null) + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @return The metadata that was written to the Fluo application for the PCJ. + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + * @throws UnsupportedQueryException + */ + public FluoQuery createPcj( + final String pcjId, + final PrecomputedJoinStorage pcjStorage, + final FluoClient fluo) throws MalformedQueryException, PcjException, UnsupportedQueryException { + requireNonNull(pcjId); + requireNonNull(pcjStorage); + requireNonNull(fluo); + + // Parse the query's structure for the metadata that will be written to fluo. + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = pcjMetadata.getSparql(); + return createPcj(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo); + } + + private FluoQuery makeFluoQuery(String sparql, String pcjId, Set<ExportStrategy> strategies) throws MalformedQueryException, UnsupportedQueryException { String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); - SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); - builder.setFluoQueryId(queryId); - builder.setSparql(sparql); - builder.setJoinBatchSize(joinBatchSize); + SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder() + .setExportStrategies(strategies) + .setFluoQueryId(queryId) + .setSparql(sparql) + .setJoinBatchSize(joinBatchSize); return builder.build(); } @@ -195,56 +230,72 @@ public class CreateFluoPcj { tx.commit(); } } - /** - * Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an - * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists. + * Tells the Fluo PCJ Updater application to maintain a new PCJ. + * <p> + * This call scans Rya for Statement Pattern matches and inserts them into + * the Fluo application. It is assumed that results for any query registered + * using this method will be exported to Kafka according to the Kafka {@link ExportStrategy}. * - * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) - * @param pcjStorage - Provides access to the PCJ index. (not null) + * @param sparql - sparql query that will registered with Fluo. (not null) * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @return The metadata that was written to the Fluo application for the PCJ. + * @param accumulo - Accumulo connector for connecting with Accumulo + * @param ryaInstance - Name of Rya instance to connect to + * @return The Fluo application's Query ID of the query that was created. * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. + * @throws UnsupportedQueryException */ - public FluoQuery createPcj( - final String pcjId, - final PrecomputedJoinStorage pcjStorage, - final FluoClient fluo) throws MalformedQueryException, PcjException { - requireNonNull(pcjId); - requireNonNull(pcjStorage); + public String withRyaIntegration( + final String sparql, + final FluoClient fluo, + final Connector accumulo, + final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException { + requireNonNull(sparql); requireNonNull(fluo); + requireNonNull(accumulo); + requireNonNull(ryaInstance); - // Parse the query's structure for the metadata that will be written to fluo. - final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); - final String sparql = pcjMetadata.getSparql(); - return createPcj(pcjId, sparql, fluo); + + // Write the SPARQL query's structure to the Fluo Application. + final FluoQuery fluoQuery = createPcj(sparql, fluo); + //import results already ingested into Rya that match query + importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance); + // return queryId to the caller for later monitoring from the export. + return fluoQuery.getQueryMetadata().getNodeId(); } + /** * Tells the Fluo PCJ Updater application to maintain a new PCJ. * <p> * This call scans Rya for Statement Pattern matches and inserts them into * the Fluo application. This method does not verify that a PcjTable with the - * the given pcjId actually exists. It is assumed that results for any query registered - * using this method will be exported to Kafka or some other external service. + * the given pcjId actually exists, so one should verify that the table exists before + * using the Rya ExportStrategy. Results will be exported according to the Set of + * {@link ExportStrategy} enums. * * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) * @param sparql - sparql query that will registered with Fluo. (not null) + * @param strategies - ExportStrategies used to specify how final results will be handled * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @param queryEngine - QueryEngine for a given Rya Instance, (not null) + * @param accumulo - Accumulo connector for connecting with Accumulo + * @param ryaInstance - name of Rya instance to connect to * @return The Fluo application's Query ID of the query that was created. * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. + * @throws UnsupportedQueryException */ public String withRyaIntegration( final String pcjId, final String sparql, + final Set<ExportStrategy> strategies, final FluoClient fluo, final Connector accumulo, - final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException { + final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException { requireNonNull(pcjId); requireNonNull(sparql); requireNonNull(fluo); @@ -253,14 +304,13 @@ public class CreateFluoPcj { // Write the SPARQL query's structure to the Fluo Application. - final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo); + final FluoQuery fluoQuery = createPcj(pcjId, sparql, strategies, fluo); //import results already ingested into Rya that match query importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance); // return queryId to the caller for later monitoring from the export. return fluoQuery.getQueryMetadata().getNodeId(); } - /** * Tells the Fluo PCJ Updater application to maintain a new PCJ. * <p> @@ -268,24 +318,26 @@ public class CreateFluoPcj { * the Fluo application. The Fluo application will then maintain the intermediate * results as new triples are inserted and export any new query results to the * {@code pcjId} within the provided {@code pcjStorage}. This method requires that a - * PCJ table already exist for the query corresponding to the pcjId. Results will be exported - * to this table. + * PCJ table already exist for the query corresponding to the pcjId. By default, results will be exported + * to this table according to the Rya {@link ExportStrategy}. * * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) * @param pcjStorage - Provides access to the PCJ index. (not null) * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @param queryEngine - QueryEngine for a given Rya Instance, (not null) + * @param accumulo - Accumuo connector for connecting to Accumulo + * @param ryaInstance - name of Rya instance to connect to * @return The Fluo application's Query ID of the query that was created. * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. + * @throws UnsupportedQueryException */ public String withRyaIntegration( final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, final Connector accumulo, - final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException { + final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException, UnsupportedQueryException { requireNonNull(pcjId); requireNonNull(pcjStorage); requireNonNull(fluo); @@ -296,9 +348,11 @@ public class CreateFluoPcj { final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); final String sparql = pcjMetadata.getSparql(); - return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance); + return withRyaIntegration(pcjId, sparql, Sets.newHashSet(ExportStrategy.RYA), fluo, accumulo, ryaInstance); } + + private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance) throws RyaDAOException { // Reuse the same set object while performing batch inserts. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java index 58a52fb..0d97b2f 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java @@ -21,7 +21,6 @@ package org.apache.rya.indexing.pcj.fluo.api; import static com.google.common.base.Preconditions.checkArgument; import static java.util.Objects.requireNonNull; -import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -33,15 +32,10 @@ import org.apache.fluo.api.data.Column; import org.apache.fluo.api.data.RowColumnValue; import org.apache.fluo.api.data.Span; import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; -import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.openrdf.query.BindingSet; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -85,8 +79,9 @@ public class DeleteFluoPcj { * Index. (not null) * @param pcjId - The PCJ ID for the query that will removed from the Fluo * application. (not null) + * @throws UnsupportedQueryException */ - public void deletePcj(final FluoClient client, final String pcjId) { + public void deletePcj(final FluoClient client, final String pcjId) throws UnsupportedQueryException { requireNonNull(client); requireNonNull(pcjId); @@ -109,84 +104,17 @@ public class DeleteFluoPcj { * @param tx - Transaction of a given Fluo table. (not null) * @param pcjId - Id of query. (not null) * @return list of Node IDs associated with the query {@code pcjId}. + * @throws UnsupportedQueryException */ - private List<String> getNodeIds(Transaction tx, String pcjId) { + private List<String> getNodeIds(Transaction tx, String pcjId) throws UnsupportedQueryException { requireNonNull(tx); requireNonNull(pcjId); - // Get the ID that tracks the query within the Fluo application. - final String queryId = getQueryIdFromPcjId(tx, pcjId); - - // Get the query's children nodes. - final List<String> nodeIds = new ArrayList<>(); - nodeIds.add(queryId); - getChildNodeIds(tx, queryId, nodeIds); - return nodeIds; + String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); + FluoQuery fluoQuery = dao.readFluoQuery(tx, queryId); + return FluoQueryUtils.collectNodeIds(fluoQuery); } - /** - * Recursively navigate query tree to extract all of the nodeIds. - * - * @param tx - Transaction of a given Fluo table. (not null) - * @param nodeId - Current node in query tree. (not null) - * @param nodeIds - The Node IDs extracted from query tree. (not null) - */ - private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) { - requireNonNull(tx); - requireNonNull(nodeId); - requireNonNull(nodeIds); - - final NodeType type = NodeType.fromNodeId(nodeId).get(); - switch (type) { - case QUERY: - final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId); - final String queryChild = queryMeta.getChildNodeId(); - nodeIds.add(queryChild); - getChildNodeIds(tx, queryChild, nodeIds); - break; - case CONSTRUCT: - final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId); - final String constructChild = constructMeta.getChildNodeId(); - nodeIds.add(constructChild); - getChildNodeIds(tx, constructChild, nodeIds); - break; - case JOIN: - final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId); - final String lchild = joinMeta.getLeftChildNodeId(); - final String rchild = joinMeta.getRightChildNodeId(); - nodeIds.add(lchild); - nodeIds.add(rchild); - getChildNodeIds(tx, lchild, nodeIds); - getChildNodeIds(tx, rchild, nodeIds); - break; - case FILTER: - final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId); - final String filterChild = filterMeta.getChildNodeId(); - nodeIds.add(filterChild); - getChildNodeIds(tx, filterChild, nodeIds); - break; - case AGGREGATION: - final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId); - final String aggChild = aggMeta.getChildNodeId(); - nodeIds.add(aggChild); - getChildNodeIds(tx, aggChild, nodeIds); - break; - case PERIODIC_QUERY: - final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId); - final String periodicChild = periodicMeta.getChildNodeId(); - nodeIds.add(periodicChild); - getChildNodeIds(tx, periodicChild, nodeIds); - break; - case PROJECTION: - final ProjectionMetadata projectionMetadata = dao.readProjectionMetadata(tx, nodeId); - final String projectionChild = projectionMetadata.getChildNodeId(); - nodeIds.add(projectionChild); - getChildNodeIds(tx, projectionChild, nodeIds); - break; - case STATEMENT_PATTERN: - break; - } - } /** * Deletes metadata for all nodeIds associated with a given queryId in a @@ -203,8 +131,6 @@ public class DeleteFluoPcj { requireNonNull(pcjId); try (final Transaction typeTx = tx) { - deletePcjIdAndSparqlMetadata(typeTx, pcjId); - for (final String nodeId : nodeIds) { final NodeType type = NodeType.fromNodeId(nodeId).get(); deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns()); @@ -232,24 +158,6 @@ public class DeleteFluoPcj { } /** - * Deletes high level query meta for converting from queryId to pcjId and - * vice versa, as well as converting from sparql to queryId. - * - * @param tx - Transaction the deletes will be performed with. (not null) - * @param pcjId - The PCJ whose metadata will be deleted. (not null) - */ - private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) { - requireNonNull(tx); - requireNonNull(pcjId); - - final String queryId = getQueryIdFromPcjId(tx, pcjId); - final String sparql = getSparqlFromQueryId(tx, queryId); - tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID); - tx.delete(sparql, FluoQueryColumns.QUERY_ID); - tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID); - } - - /** * Deletes all results (BindingSets or Statements) associated with the specified nodeId. * * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null) @@ -294,19 +202,4 @@ public class DeleteFluoPcj { } } - private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) { - requireNonNull(tx); - requireNonNull(pcjId); - - final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID); - return queryIdBytes.toString(); - } - - private String getSparqlFromQueryId(final Transaction tx, final String queryId) { - requireNonNull(tx); - requireNonNull(queryId); - - final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId); - return metadata.getSparql(); - } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java index 061a1d5..d08cb73 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadata.java @@ -24,15 +24,13 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.data.Bytes; - /** * Get {@link PcjMetadata} for queries that are managed by the Fluo app. */ @@ -87,7 +85,7 @@ public class GetPcjMetadata { // Lookup the Rya PCJ ID associated with the query. String pcjId = null; try(Snapshot snap = fluo.newSnapshot() ) { - pcjId = snap.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); + pcjId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId); if(pcjId == null) { throw new NotInFluoException("Could not get the PcjMetadata for queryId '" + queryId + "' because a Rya PCJ ID not stored in the Fluo table."); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java index 1fb1485..ddbaaaf 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReport.java @@ -25,27 +25,27 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import edu.umd.cs.findbugs.annotations.Nullable; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import net.jcip.annotations.Immutable; - +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Snapshot; +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.client.scanner.ColumnScanner; +import org.apache.fluo.api.client.scanner.RowScanner; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.Span; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.UnsupportedQueryException; import com.google.common.collect.ImmutableMap; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Snapshot; -import org.apache.fluo.api.client.SnapshotBase; -import org.apache.fluo.api.client.scanner.ColumnScanner; -import org.apache.fluo.api.client.scanner.RowScanner; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.Span; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import net.jcip.annotations.Immutable; /** * Get a reports that indicates how many binding sets have been emitted for @@ -63,8 +63,9 @@ public class GetQueryReport { * @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null) * @return A map from Query ID to QueryReport that holds a report for all of * the queries that are being managed within the fluo app. + * @throws UnsupportedQueryException */ - public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) { + public Map<String, QueryReport> getAllQueryReports(final FluoClient fluo) throws UnsupportedQueryException { checkNotNull(fluo); // Fetch the queries that are being managed by the Fluo. @@ -85,8 +86,9 @@ public class GetQueryReport { * @param fluo - The connection to Fluo that will be used to fetch the metadata. (not null) * @param queryId - The ID of the query to fetch. (not null) * @return A report that was built for the query. + * @throws UnsupportedQueryException */ - public QueryReport getReport(final FluoClient fluo, final String queryId) { + public QueryReport getReport(final FluoClient fluo, final String queryId) throws UnsupportedQueryException { checkNotNull(fluo); checkNotNull(queryId); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java index df1648b..e09d0c6 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/ListQueryIds.java @@ -52,7 +52,7 @@ public class ListQueryIds { try(Snapshot snap = fluo.newSnapshot() ) { // Create an iterator that iterates over the QUERY_ID column. - final CellScanner cellScanner = snap.scanner().fetch( FluoQueryColumns.QUERY_ID).build(); + final CellScanner cellScanner = snap.scanner().fetch( FluoQueryColumns.QUERY_NODE_ID).build(); for (RowColumnValue rcv : cellScanner) { queryIds.add(rcv.getsValue()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java index 4b6f44e..c090d37 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/IncrementalUpdateConstants.java @@ -39,9 +39,6 @@ public class IncrementalUpdateConstants { public static final String CONSTRUCT_PREFIX = "CONSTRUCT"; public static final String PERIODIC_QUERY_PREFIX = "PERIODIC_QUERY"; - public static enum QueryType{Construct, Projection, Periodic}; - public static enum ExportStrategy{Rya, Kafka}; - public static final String PERIODIC_BIN_ID = PeriodicQueryResultStorage.PeriodicBinId; public static final String URI_TYPE = "http://www.w3.org/2001/XMLSchema#anyURI"; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java new file mode 100644 index 0000000..62f1271 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/ExporterManager.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export; + +import java.io.UnsupportedEncodingException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import org.apache.fluo.api.data.Bytes; +import org.apache.rya.accumulo.utils.VisibilitySimplifier; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; +import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter.ResultExportException; +import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSetSerDe; + +import com.google.common.base.Preconditions; + +/** + * This class manages all of the {@link IncrementalResultExporter}s for the Rya Fluo Application. + * It maps the {@link FluoQuery}'s {@link QueryType} and Set of {@link ExportStrategy} objects + * to the correct IncrementalResultExporter. + * + */ +public class ExporterManager implements AutoCloseable { + + private static final VisibilityBindingSetSerDe BS_SERDE = new VisibilityBindingSetSerDe(); + private static final RyaSubGraphKafkaSerDe SG_SERDE = new RyaSubGraphKafkaSerDe(); + private Map<String, String> simplifiedVisibilities = new HashMap<>(); + + private Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> exporters; + + private ExporterManager(Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> exporters) { + this.exporters = Preconditions.checkNotNull(exporters); + } + + /** + * @return {@link Builder} for constructing an instance of an ExporterManager. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Maps the data to the correct {@link IncrementalResultExporter} using the provided + * QueryType and ExportStrategies to be exported. + * @param type - QueryType that produced the result + * @param strategies - ExportStrategies used to export the result + * @param queryId - Fluo Query Id for the query that produced the result + * @param data - Serialized result to be exported + * @throws ResultExportException + */ + public void export(QueryType type, Set<ExportStrategy> strategies, String queryId, Bytes data) throws ResultExportException { + + String pcjId = FluoQueryUtils.convertFluoQueryIdToPcjId(queryId); + + if(type == QueryType.CONSTRUCT) { + exportSubGraph(exporters.get(type), strategies, pcjId, data); + } else { + exportBindingSet(exporters.get(type), strategies, pcjId, data); + } + + } + + /** + * Exports BindingSet using the exporters for a given {@link QueryType}. + * @param exporters - exporters corresponding to a given queryType + * @param strategies - export strategies used to export results (possibly a subset of those in the exporters map) + * @param pcjId - id of the query whose results are being exported + * @param data - serialized BindingSet result + * @throws ResultExportException + */ + private void exportBindingSet(Map<ExportStrategy, IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws ResultExportException { + try { + VisibilityBindingSet bs = BS_SERDE.deserialize(data); + simplifyVisibilities(bs); + + for(ExportStrategy strategy: strategies) { + IncrementalBindingSetExporter exporter = (IncrementalBindingSetExporter) exporters.get(strategy); + exporter.export(pcjId, bs); + } + } catch (Exception e) { + throw new ResultExportException("Unable to deserialize the provided BindingSet", e); + } + } + + /** + * Exports RyaSubGraph using the exporters for a given {@link QueryType}. + * @param exporters - exporters corresponding to a given queryType + * @param strategies - export strategies used to export results (possibly a subset of those in the exporters map) + * @param pcjId - id of the query whose results are being exported + * @param data - serialized RyaSubGraph result + * @throws ResultExportException + */ + private void exportSubGraph(Map<ExportStrategy, IncrementalResultExporter> exporters, Set<ExportStrategy> strategies, String pcjId, Bytes data) throws ResultExportException { + RyaSubGraph subGraph = SG_SERDE.fromBytes(data.toArray()); + + try { + simplifyVisibilities(subGraph); + } catch (UnsupportedEncodingException e) { + throw new ResultExportException("Undable to deserialize provided RyaSubgraph", e); + } + + for(ExportStrategy strategy: strategies) { + IncrementalRyaSubGraphExporter exporter = (IncrementalRyaSubGraphExporter) exporters.get(strategy); + exporter.export(pcjId, subGraph); + } + } + + private void simplifyVisibilities(VisibilityBindingSet result) { + // Simplify the result's visibilities. + final String visibility = result.getVisibility(); + if(!simplifiedVisibilities.containsKey(visibility)) { + final String simplified = VisibilitySimplifier.simplify( visibility ); + simplifiedVisibilities.put(visibility, simplified); + } + result.setVisibility( simplifiedVisibilities.get(visibility) ); + } + + private void simplifyVisibilities(RyaSubGraph subgraph) throws UnsupportedEncodingException { + Set<RyaStatement> statements = subgraph.getStatements(); + if (statements.size() > 0) { + byte[] visibilityBytes = statements.iterator().next().getColumnVisibility(); + // Simplify the result's visibilities and cache new simplified + // visibilities + String visibility = new String(visibilityBytes, "UTF-8"); + if (!simplifiedVisibilities.containsKey(visibility)) { + String simplified = VisibilitySimplifier.simplify(visibility); + simplifiedVisibilities.put(visibility, simplified); + } + + for (RyaStatement statement : statements) { + statement.setColumnVisibility(simplifiedVisibilities.get(visibility).getBytes("UTF-8")); + } + + subgraph.setStatements(statements); + } + } + + public static class Builder { + + private Map<QueryType, Map<ExportStrategy, IncrementalResultExporter>> exporters = new HashMap<>(); + + /** + * Add an {@link IncrementalResultExporter} to be used by this ExporterManager for exporting results + * @param exporter - IncrementalResultExporter for exporting query results + * @return - Builder for chaining method calls + */ + public Builder addIncrementalResultExporter(IncrementalResultExporter exporter) { + + Set<QueryType> types = exporter.getQueryTypes(); + ExportStrategy strategy = exporter.getExportStrategy(); + + for (QueryType type : types) { + if (!exporters.containsKey(type)) { + Map<ExportStrategy, IncrementalResultExporter> exportMap = new HashMap<>(); + exportMap.put(strategy, exporter); + exporters.put(type, exportMap); + } else { + Map<ExportStrategy, IncrementalResultExporter> exportMap = exporters.get(type); + if (!exportMap.containsKey(strategy)) { + exportMap.put(strategy, exporter); + } + } + } + + return this; + } + + /** + * @return - ExporterManager for managing IncrementalResultExporters and exporting results + */ + public ExporterManager build() { + //adds NoOpExporter in the event that users does not want to Export results + addIncrementalResultExporter(new NoOpExporter()); + return new ExporterManager(exporters); + } + + } + + @Override + public void close() throws Exception { + + Collection<Map<ExportStrategy, IncrementalResultExporter>> values = exporters.values(); + + for(Map<ExportStrategy, IncrementalResultExporter> map: values) { + for(IncrementalResultExporter exporter: map.values()) { + exporter.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java index c2f4cb4..9877671 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporter.java @@ -18,7 +18,6 @@ */ package org.apache.rya.indexing.pcj.fluo.app.export; -import org.apache.fluo.api.client.TransactionBase; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; @@ -29,17 +28,16 @@ import edu.umd.cs.findbugs.annotations.NonNull; * other location. */ @DefaultAnnotation(NonNull.class) -public interface IncrementalBindingSetExporter extends AutoCloseable { +public interface IncrementalBindingSetExporter extends IncrementalResultExporter { /** * Export a Binding Set that is a result of a SPARQL query that does not include a Group By clause. * - * @param tx - The Fluo transaction this export is a part of. (not null) - * @param queryId - The Fluo ID of the SPARQL query the binding set is a result of. (not null) + * @param queryId - The PCJ ID of the SPARQL query the binding set is a result of. (not null) * @param bindingSetString - The Binding Set as it was represented within the Fluo application. (not null) * @throws ResultExportException The result could not be exported. */ - public void export(TransactionBase tx, String queryId, VisibilityBindingSet result) throws ResultExportException; + public void export(String queryId, VisibilityBindingSet result) throws ResultExportException; /** * A result could not be exported. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java deleted file mode 100644 index 1bf492a..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalBindingSetExporterFactory.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.app.export; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -import com.google.common.base.Optional; - -import org.apache.fluo.api.observer.Observer.Context; - -/** - * Builds instances of {@link IncrementalBindingSetExporter} using the provided - * configurations. - */ -@DefaultAnnotation(NonNull.class) -public interface IncrementalBindingSetExporterFactory { - - /** - * Builds an instance of {@link IncrementalBindingSetExporter} using the - * configurations that are provided. - * - * @param context - Contains the host application's configuration values - * and any parameters that were provided at initialization. (not null) - * @return An exporter if configurations were found in the context; otherwise absent. - * @throws IncrementalExporterFactoryException A non-configuration related - * problem has occurred and the exporter could not be created as a result. - * @throws ConfigurationException Thrown if configuration values were - * provided, but an instance of the exporter could not be initialized - * using them. This could be because they were improperly formatted, - * a required field was missing, or some other configuration based problem. - */ - public Optional<IncrementalBindingSetExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException; - - /** - * Indicates a {@link IncrementalBindingSetExporter} could not be created by a - * {@link IncrementalBindingSetExporterFactory}. - */ - public static class IncrementalExporterFactoryException extends Exception { - private static final long serialVersionUID = 1L; - - /** - * Constructs an instance of {@link }. - * - * @param message - Explains why this exception is being thrown. - */ - public IncrementalExporterFactoryException(final String message) { - super(message); - } - - /** - * Constructs an instance of {@link }. - * - * @param message - Explains why this exception is being thrown. - * @param cause - The exception that caused this one to be thrown. - */ - public IncrementalExporterFactoryException(final String message, final Throwable t) { - super(message, t); - } - } - - /** - * The configuration could not be interpreted because required fields were - * missing or a value wasn't properly formatted. - */ - public static class ConfigurationException extends IncrementalExporterFactoryException { - private static final long serialVersionUID = 1L; - - /** - * Constructs an instance of {@link ConfigurationException}. - * - * @param message - Explains why this exception is being thrown. - */ - public ConfigurationException(final String message) { - super(message); - } - - /** - * Constructs an instance of {@link ConfigurationException}. - * - * @param message - Explains why this exception is being thrown. - * @param cause - The exception that caused this one to be thrown. - */ - public ConfigurationException(final String message, final Throwable cause) { - super(message, cause); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java new file mode 100644 index 0000000..e49a777 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporter.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export; + +import java.util.Set; + +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; + +/** + * Common interface for the different incremental exporters used in the Rya Fluo Application. + * + */ +public interface IncrementalResultExporter extends AutoCloseable { + + /** + * @return - A Set of {@link QueryType}s whose results this exporter handles + */ + public Set<QueryType> getQueryTypes(); + + /** + * @return - The {@link ExportStrategy} indicating where results are exported + */ + public ExportStrategy getExportStrategy(); + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java new file mode 100644 index 0000000..5bba4ab --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalResultExporterFactory.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +import com.google.common.base.Optional; + +import org.apache.fluo.api.observer.Observer.Context; + +/** + * Builds instances of {@link IncrementalResultExporter} using the provided + * configurations. + */ +@DefaultAnnotation(NonNull.class) +public interface IncrementalResultExporterFactory { + + /** + * Builds an instance of {@link IncrementalResultExporter} using the + * configurations that are provided. + * + * @param context - Contains the host application's configuration values + * and any parameters that were provided at initialization. (not null) + * @return An exporter if configurations were found in the context; otherwise absent. + * @throws IncrementalExporterFactoryException A non-configuration related + * problem has occurred and the exporter could not be created as a result. + * @throws ConfigurationException Thrown if configuration values were + * provided, but an instance of the exporter could not be initialized + * using them. This could be because they were improperly formatted, + * a required field was missing, or some other configuration based problem. + */ + public Optional<IncrementalResultExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException; + + /** + * Indicates a {@link IncrementalResultExporter} could not be created by a + * {@link IncrementalBindingSetExporterFactory}. + */ + public static class IncrementalExporterFactoryException extends Exception { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link }. + * + * @param message - Explains why this exception is being thrown. + */ + public IncrementalExporterFactoryException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link }. + * + * @param message - Explains why this exception is being thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public IncrementalExporterFactoryException(final String message, final Throwable t) { + super(message, t); + } + } + + /** + * The configuration could not be interpreted because required fields were + * missing or a value wasn't properly formatted. + */ + public static class ConfigurationException extends IncrementalExporterFactoryException { + private static final long serialVersionUID = 1L; + + /** + * Constructs an instance of {@link ConfigurationException}. + * + * @param message - Explains why this exception is being thrown. + */ + public ConfigurationException(final String message) { + super(message); + } + + /** + * Constructs an instance of {@link ConfigurationException}. + * + * @param message - Explains why this exception is being thrown. + * @param cause - The exception that caused this one to be thrown. + */ + public ConfigurationException(final String message, final Throwable cause) { + super(message, cause); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java index 797502c..7b7f084 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporter.java @@ -25,7 +25,7 @@ import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter * from the Rya-Fluo application to the core Rya tables. * */ -public interface IncrementalRyaSubGraphExporter extends AutoCloseable { +public interface IncrementalRyaSubGraphExporter extends IncrementalResultExporter { /** * Export a RyaSubGraph that is the result of SPARQL Construct Query. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java deleted file mode 100644 index ecbec09..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/IncrementalRyaSubGraphExporterFactory.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.apache.rya.indexing.pcj.fluo.app.export; -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import org.apache.fluo.api.observer.Observer.Context; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.ConfigurationException; -import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporterFactory.IncrementalExporterFactoryException; - -import com.google.common.base.Optional; - -/** - * Builds instances of {@link IncrementalRyaSubGraphExporter} using the provided - * configurations. - */ -public interface IncrementalRyaSubGraphExporterFactory { - - /** - * Builds an instance of {@link IncrementalRyaSubGraphExporter} using the - * configurations that are provided. - * - * @param context - Contains the host application's configuration values - * and any parameters that were provided at initialization. (not null) - * @return An exporter if configurations were found in the context; otherwise absent. - * @throws IncrementalExporterFactoryException A non-configuration related - * problem has occurred and the exporter could not be created as a result. - * @throws ConfigurationException Thrown if configuration values were - * provided, but an instance of the exporter could not be initialized - * using them. This could be because they were improperly formatted, - * a required field was missing, or some other configuration based problem. - */ - public Optional<IncrementalRyaSubGraphExporter> build(Context context) throws IncrementalExporterFactoryException, ConfigurationException; -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java new file mode 100644 index 0000000..ab7f2ed --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/NoOpExporter.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.app.export; + +import java.util.Set; + +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; +import org.apache.rya.api.domain.RyaSubGraph; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; + +import com.google.common.collect.Sets; + +/** + * This class is a NoOpExporter that can be specified if a user does not + * want their results exported from Fluo. + * + */ +public class NoOpExporter implements IncrementalBindingSetExporter, IncrementalRyaSubGraphExporter { + + @Override + public Set<QueryType> getQueryTypes() { + return Sets.newHashSet(QueryType.CONSTRUCT, QueryType.PROJECTION); + } + + @Override + public ExportStrategy getExportStrategy() { + return ExportStrategy.NO_OP_EXPORT; + } + + @Override + public void close() throws Exception { + } + + @Override + public void export(String constructID, RyaSubGraph subgraph) throws ResultExportException { + } + + @Override + public void export(String queryId, VisibilityBindingSet result) throws ResultExportException { + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/05147266/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java index 7c4b3cc..0c26d65 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/export/kafka/KafkaBindingSetExporter.java @@ -20,18 +20,21 @@ package org.apache.rya.indexing.pcj.fluo.app.export.kafka; import static com.google.common.base.Preconditions.checkNotNull; +import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.apache.fluo.api.client.TransactionBase; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.log4j.Logger; +import org.apache.rya.api.client.CreatePCJ.ExportStrategy; +import org.apache.rya.api.client.CreatePCJ.QueryType; import org.apache.rya.indexing.pcj.fluo.app.export.IncrementalBindingSetExporter; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import com.google.common.collect.Sets; + /** * Incrementally exports SPARQL query results to Kafka topics. */ @@ -57,17 +60,15 @@ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { * Send the results to the topic using the queryID as the topicname */ @Override - public void export(final TransactionBase fluoTx, final String queryId, final VisibilityBindingSet result) throws ResultExportException { - checkNotNull(fluoTx); + public void export(final String queryId, final VisibilityBindingSet result) throws ResultExportException { checkNotNull(queryId); checkNotNull(result); try { - final String pcjId = fluoTx.gets(queryId, FluoQueryColumns.RYA_PCJ_ID); - final String msg = "out to kafta topic: queryId=" + queryId + " pcjId=" + pcjId + " result=" + result; + final String msg = "Out to Kafka topic: " + queryId + ", Result: " + result; log.trace(msg); // Send the result to the topic whose name matches the PCJ ID. - final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(pcjId, result); + final ProducerRecord<String, VisibilityBindingSet> rec = new ProducerRecord<>(queryId, result); final Future<RecordMetadata> future = producer.send(rec); // Don't let the export return until the result has been written to the topic. Otherwise we may lose results. @@ -84,4 +85,14 @@ public class KafkaBindingSetExporter implements IncrementalBindingSetExporter { public void close() throws Exception { producer.close(5, TimeUnit.SECONDS); } + + @Override + public Set<QueryType> getQueryTypes() { + return Sets.newHashSet(QueryType.PROJECTION); + } + + @Override + public ExportStrategy getExportStrategy() { + return ExportStrategy.KAFKA; + } } \ No newline at end of file
