RYA-282-Nested-Query. Closes #192.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/e387818b Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/e387818b Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/e387818b Branch: refs/heads/master Commit: e387818ba22b07a07432d62eea172da6c33d793f Parents: 6ce0b00 Author: Caleb Meier <[email protected]> Authored: Thu Jul 20 06:57:38 2017 -0700 Committer: Caleb Meier <[email protected]> Committed: Fri Aug 18 11:50:36 2017 -0700 ---------------------------------------------------------------------- .../api/client/accumulo/AccumuloCreatePCJ.java | 3 +- .../api/client/accumulo/AccumuloDeletePCJ.java | 4 +- .../client/accumulo/AccumuloCreatePCJIT.java | 1 + .../rya/api/client/accumulo/FluoITBase.java | 4 + .../indexing/pcj/fluo/api/CreateFluoPcj.java | 385 ++++++++++++++ .../rya/indexing/pcj/fluo/api/CreatePcj.java | 450 ---------------- .../indexing/pcj/fluo/api/DeleteFluoPcj.java | 312 +++++++++++ .../rya/indexing/pcj/fluo/api/DeletePcj.java | 305 ----------- .../pcj/fluo/app/ConstructProjection.java | 4 - .../fluo/app/ConstructQueryResultUpdater.java | 46 +- .../pcj/fluo/app/FilterResultUpdater.java | 23 +- .../pcj/fluo/app/FluoStringConverter.java | 2 - .../fluo/app/IncrementalUpdateConstants.java | 4 + .../pcj/fluo/app/JoinResultUpdater.java | 31 +- .../rya/indexing/pcj/fluo/app/NodeType.java | 57 +- .../pcj/fluo/app/PeriodicQueryUpdater.java | 1 - .../pcj/fluo/app/ProjectionResultUpdater.java | 89 ++++ .../pcj/fluo/app/QueryResultUpdater.java | 20 +- .../export/kafka/KafkaRyaSubGraphExporter.java | 2 - .../fluo/app/observers/BindingSetUpdater.java | 14 +- .../fluo/app/observers/ProjectionObserver.java | 65 +++ .../fluo/app/observers/QueryResultObserver.java | 2 +- .../pcj/fluo/app/query/AggregationMetadata.java | 8 +- .../pcj/fluo/app/query/CommonNodeMetadata.java | 14 + .../fluo/app/query/ConstructQueryMetadata.java | 94 ++-- .../pcj/fluo/app/query/FilterMetadata.java | 11 +- .../indexing/pcj/fluo/app/query/FluoQuery.java | 234 ++++++--- .../pcj/fluo/app/query/FluoQueryColumns.java | 45 +- .../fluo/app/query/FluoQueryMetadataDAO.java | 148 ++++-- .../pcj/fluo/app/query/JoinMetadata.java | 17 +- .../fluo/app/query/PeriodicQueryMetadata.java | 9 +- .../pcj/fluo/app/query/ProjectionMetadata.java | 236 +++++++++ .../fluo/app/query/QueryBuilderVisitorBase.java | 119 +++++ .../pcj/fluo/app/query/QueryMetadata.java | 98 +++- .../app/query/QueryMetadataVisitorBase.java | 113 ++++ .../fluo/app/query/SparqlFluoQueryBuilder.java | 444 +++++++++++----- .../app/query/StatementPatternMetadata.java | 7 +- .../pcj/fluo/app/util/FluoQueryUtils.java | 62 +++ .../pcj/fluo/app/util/PeriodicQueryUtil.java | 76 +-- .../app/util/VariableOrderUpdateVisitor.java | 166 ++++++ .../fluo/app/query/PeriodicQueryUtilTest.java | 10 +- .../fluo/app/query/QueryBuilderVisitorTest.java | 105 ++++ .../app/query/QueryMetadataVisitorTest.java | 109 ++++ .../fluo/client/command/NewQueryCommand.java | 4 +- .../fluo/client/util/QueryReportRenderer.java | 41 +- .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 4 +- .../pcj/fluo/ConstructGraphTestUtils.java | 15 +- .../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 4 +- .../indexing/pcj/fluo/api/GetQueryReportIT.java | 4 +- .../fluo/app/query/FluoQueryMetadataDAOIT.java | 163 +++++- .../pcj/fluo/integration/BatchDeleteIT.java | 8 +- .../pcj/fluo/integration/CreateDeleteIT.java | 10 +- .../indexing/pcj/fluo/integration/InputIT.java | 10 +- .../pcj/fluo/integration/KafkaExportIT.java | 163 +++++- .../integration/KafkaRyaSubGraphExportIT.java | 86 ++- .../indexing/pcj/fluo/integration/QueryIT.java | 517 ++++++++++++------- .../pcj/fluo/integration/RyaExportIT.java | 4 +- .../RyaInputIncrementalUpdateIT.java | 8 +- .../pcj/fluo/integration/StreamingTestIT.java | 4 +- .../HistoricStreamingVisibilityIT.java | 4 +- .../pcj/fluo/visibility/PcjVisibilityIT.java | 4 +- .../pcj/fluo/test/base/KafkaExportITBase.java | 4 +- .../rya/pcj/fluo/test/base/RyaExportITBase.java | 2 + .../PeriodicNotificationProviderIT.java | 9 +- .../notification/api/CreatePeriodicQuery.java | 10 +- .../recovery/PeriodicNotificationProvider.java | 9 +- 66 files changed, 3569 insertions(+), 1467 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java index 3fe1042..644189a 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java @@ -38,6 +38,7 @@ import org.apache.rya.api.instance.RyaDetailsUpdater; import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.pcj.fluo.api.CreateFluoPcj; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; @@ -145,7 +146,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { cd.getZookeepers(), fluoAppName);) { // Initialize the PCJ within the Fluo application. - final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj(); + final CreateFluoPcj fluoCreatePcj = new CreateFluoPcj(); fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java index 96e6d58..eb2b2d7 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java @@ -31,7 +31,7 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; -import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; +import org.apache.rya.indexing.pcj.fluo.api.DeleteFluoPcj; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; @@ -123,7 +123,7 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ { cd.getZookeepers(), fluoAppName)) { // Delete the PCJ from the Fluo App. - new DeletePcj(1000).deletePcj(fluoClient, pcjId); + new DeleteFluoPcj(1000).deletePcj(fluoClient, pcjId); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java index 9bbf01f..3463a02 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java @@ -34,6 +34,7 @@ import org.apache.rya.api.instance.RyaDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds; +import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java index 2e16412..113b397 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/FluoITBase.java @@ -39,8 +39,10 @@ import org.apache.rya.accumulo.MiniAccumuloClusterInstance; import org.apache.rya.accumulo.MiniAccumuloSingleton; import org.apache.rya.accumulo.RyaTestInstanceRule; import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters; +import org.apache.rya.indexing.pcj.fluo.app.observers.ConstructQueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver; +import org.apache.rya.indexing.pcj.fluo.app.observers.ProjectionObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver; import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver; @@ -219,6 +221,8 @@ public abstract class FluoITBase { observers.add(new ObserverSpecification(StatementPatternObserver.class.getName())); observers.add(new ObserverSpecification(JoinObserver.class.getName())); observers.add(new ObserverSpecification(FilterObserver.class.getName())); + observers.add(new ObserverSpecification(ProjectionObserver.class.getName())); + observers.add(new ObserverSpecification(ConstructQueryResultObserver.class.getName())); // Provide export parameters child test classes may provide to the // export observer. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java new file mode 100644 index 0000000..e450960 --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreateFluoPcj.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Transaction; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.api.persist.query.BatchRyaQuery; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder; +import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.calrissian.mango.collect.CloseableIterable; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.algebra.StatementPattern; + +import com.google.common.base.Preconditions; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. + * <p> + * This is a two phase process. + * <ol> + * <li>Setup metadata about each node of the query using a single Fluo transaction. </li> + * <li>Scan Rya for binding sets that match each Statement Pattern from the query + * and use a separate Fluo transaction for each batch that is inserted. This + * ensure historic triples will be included in the query's results.</li> + * </ol> + * After the first step is finished, any new Triples that are added to the Fluo + * application will be matched against statement patterns, the final results + * will percolate to the top of the query, and those results will be exported to + * Rya's query system. + */ +@DefaultAnnotation(NonNull.class) +public class CreateFluoPcj { + private static final Logger log = Logger.getLogger(CreateFluoPcj.class); + + /** + * The default Statement Pattern batch insert size is 1000. + */ + private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000; + + /** + * The maximum number of binding sets that will be inserted into each Statement + * Pattern's result set per Fluo transaction. + */ + private final int spInsertBatchSize; + + /** + * Constructs an instance of {@link CreateFluoPcj} that uses + * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size. + */ + public CreateFluoPcj() { + this(DEFAULT_SP_INSERT_BATCH_SIZE); + } + + /** + * Constructs an instance of {@link CreateFluoPcj}. + * + * @param spInsertBatchSize - The maximum number of binding sets that will be + * inserted into each Statement Pattern's result set per Fluo transaction. + */ + public CreateFluoPcj(final int spInsertBatchSize) { + checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0."); + this.spInsertBatchSize = spInsertBatchSize; + } + + + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method + * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated + * inside of Fluo. This method assumes that the user will export the results to Kafka or + * some other external resource. The export id is equivalent to the queryId that is returned, + * which is in contrast to the other createPcj methods in this class which accept an external pcjId + * that is used to identify the Accumulo table or Kafka topic for exporting results. + * + * @param sparql - sparql query String to be registered with Fluo + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @return The metadata that was written to the Fluo application for the PCJ. + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + */ + public FluoQuery createPcj(String sparql, FluoClient fluo) throws MalformedQueryException { + Preconditions.checkNotNull(sparql); + Preconditions.checkNotNull(fluo); + + String pcjId = UUID.randomUUID().toString().replaceAll("-", ""); + return createPcj(pcjId, sparql, fluo); + } + + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method provides + * no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely + * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated + * inside of Fluo. This method assumes that the user will export the results to Kafka or + * some other external resource. + * + * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) + * @param sparql - sparql query String to be registered with Fluo + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @return The metadata that was written to the Fluo application for the PCJ. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + */ + public FluoQuery createPcj( + final String pcjId, + final String sparql, + final FluoClient fluo) throws MalformedQueryException { + requireNonNull(pcjId); + requireNonNull(sparql); + requireNonNull(fluo); + + FluoQuery fluoQuery = makeFluoQuery(sparql, pcjId); + writeFluoQuery(fluo, fluoQuery, pcjId); + + return fluoQuery; + } + + private FluoQuery makeFluoQuery(String sparql, String pcjId) throws MalformedQueryException { + + String queryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId); + + SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder(); + builder.setFluoQueryId(queryId); + builder.setSparql(sparql); + + return builder.build(); + } + + private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) { + try (Transaction tx = fluo.newTransaction()) { + // Write the query's structure to Fluo. + new FluoQueryMetadataDAO().write(tx, fluoQuery); + + // Flush the changes to Fluo. + tx.commit(); + } + } + + + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an + * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists. + * + * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) + * @param pcjStorage - Provides access to the PCJ index. (not null) + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @return The metadata that was written to the Fluo application for the PCJ. + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + */ + public FluoQuery createPcj( + final String pcjId, + final PrecomputedJoinStorage pcjStorage, + final FluoClient fluo) throws MalformedQueryException, PcjException { + requireNonNull(pcjId); + requireNonNull(pcjStorage); + requireNonNull(fluo); + + // Parse the query's structure for the metadata that will be written to fluo. + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = pcjMetadata.getSparql(); + return createPcj(pcjId, sparql, fluo); + } + + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. + * <p> + * This call scans Rya for Statement Pattern matches and inserts them into + * the Fluo application. This method does not verify that a PcjTable with the + * the given pcjId actually exists. It is assumed that results for any query registered + * using this method will be exported to Kafka or some other external service. + * + * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) + * @param sparql - sparql query that will registered with Fluo. (not null) + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @param queryEngine - QueryEngine for a given Rya Instance, (not null) + * @return The Fluo application's Query ID of the query that was created. + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. + */ + public String withRyaIntegration( + final String pcjId, + final String sparql, + final FluoClient fluo, + final Connector accumulo, + final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException { + requireNonNull(pcjId); + requireNonNull(sparql); + requireNonNull(fluo); + requireNonNull(accumulo); + requireNonNull(ryaInstance); + + + // Write the SPARQL query's structure to the Fluo Application. + final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo); + //import results already ingested into Rya that match query + importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance); + // return queryId to the caller for later monitoring from the export. + return fluoQuery.getQueryMetadata().getNodeId(); + } + + + /** + * Tells the Fluo PCJ Updater application to maintain a new PCJ. + * <p> + * This call scans Rya for Statement Pattern matches and inserts them into + * the Fluo application. The Fluo application will then maintain the intermediate + * results as new triples are inserted and export any new query results to the + * {@code pcjId} within the provided {@code pcjStorage}. This method requires that a + * PCJ table already exist for the query corresponding to the pcjId. Results will be exported + * to this table. + * + * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) + * @param pcjStorage - Provides access to the PCJ index. (not null) + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @param queryEngine - QueryEngine for a given Rya Instance, (not null) + * @return The Fluo application's Query ID of the query that was created. + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. + */ + public String withRyaIntegration( + final String pcjId, + final PrecomputedJoinStorage pcjStorage, + final FluoClient fluo, + final Connector accumulo, + final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException { + requireNonNull(pcjId); + requireNonNull(pcjStorage); + requireNonNull(fluo); + requireNonNull(accumulo); + requireNonNull(ryaInstance); + + // Parse the query's structure for the metadata that will be written to fluo. + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = pcjMetadata.getSparql(); + + return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance); + } + + private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance) + throws RyaDAOException { + // Reuse the same set object while performing batch inserts. + final Set<RyaStatement> queryBatch = new HashSet<>(); + + // Iterate through each of the statement patterns and insert their + // historic matches into Fluo. + for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) { + // Get an iterator over all of the binding sets that match the + // statement pattern. + final StatementPattern pattern = FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern()); + queryBatch.add(spToRyaStatement(pattern)); + } + + // Create AccumuloRyaQueryEngine to query for historic results + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(ryaInstance); + conf.setAuths(getAuths(accumulo)); + + try (final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf); + CloseableIterable<RyaStatement> queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch))) { + final Set<RyaStatement> triplesBatch = new HashSet<>(); + + // Insert batches of the binding sets into Fluo. + for (final RyaStatement ryaStatement : queryIterable) { + if (triplesBatch.size() == spInsertBatchSize) { + writeBatch(fluo, triplesBatch); + triplesBatch.clear(); + } + + triplesBatch.add(ryaStatement); + } + + if (!triplesBatch.isEmpty()) { + writeBatch(fluo, triplesBatch); + triplesBatch.clear(); + } + } catch (final IOException e) { + log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e); + } + } + + private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) { + checkNotNull(fluo); + checkNotNull(batch); + new InsertTriples().insert(fluo, batch); + } + + private static RyaStatement spToRyaStatement(final StatementPattern sp) { + final Value subjVal = sp.getSubjectVar().getValue(); + final Value predVal = sp.getPredicateVar().getValue(); + final Value objVal = sp.getObjectVar().getValue(); + + RyaURI subjURI = null; + RyaURI predURI = null; + RyaType objType = null; + + if(subjVal != null) { + if(!(subjVal instanceof Resource)) { + throw new AssertionError("Subject must be a Resource."); + } + subjURI = RdfToRyaConversions.convertResource((Resource) subjVal); + } + + if (predVal != null) { + if(!(predVal instanceof URI)) { + throw new AssertionError("Predicate must be a URI."); + } + predURI = RdfToRyaConversions.convertURI((URI) predVal); + } + + if (objVal != null ) { + objType = RdfToRyaConversions.convertValue(objVal); + } + + return new RyaStatement(subjURI, predURI, objType); + } + + private String[] getAuths(final Connector accumulo) { + Authorizations auths; + try { + auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami()); + final List<byte[]> authList = auths.getAuthorizations(); + final String[] authArray = new String[authList.size()]; + for(int i = 0; i < authList.size(); i++){ + authArray[i] = new String(authList.get(i), "UTF-8"); + } + return authArray; + } catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) { + throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java deleted file mode 100644 index 767d9d2..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java +++ /dev/null @@ -1,450 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.api; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; -import static java.util.Objects.requireNonNull; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.Set; - -import org.apache.accumulo.core.client.AccumuloException; -import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.security.Authorizations; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Transaction; -import org.apache.log4j.Logger; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.api.domain.RyaType; -import org.apache.rya.api.domain.RyaURI; -import org.apache.rya.api.persist.RyaDAOException; -import org.apache.rya.api.persist.query.BatchRyaQuery; -import org.apache.rya.api.resolver.RdfToRyaConversions; -import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; -import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; -import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder; -import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; -import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.calrissian.mango.collect.CloseableIterable; -import org.openrdf.model.Resource; -import org.openrdf.model.URI; -import org.openrdf.model.Value; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.sparql.SPARQLParser; - -import com.google.common.base.Preconditions; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -/** - * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. - * <p> - * This is a two phase process. - * <ol> - * <li>Setup metadata about each node of the query using a single Fluo transaction. </li> - * <li>Scan Rya for binding sets that match each Statement Pattern from the query - * and use a separate Fluo transaction for each batch that is inserted. This - * ensure historic triples will be included in the query's results.</li> - * </ol> - * After the first step is finished, any new Triples that are added to the Fluo - * application will be matched against statement patterns, the final results - * will percolate to the top of the query, and those results will be exported to - * Rya's query system. - */ -@DefaultAnnotation(NonNull.class) -public class CreatePcj { - private static final Logger log = Logger.getLogger(CreatePcj.class); - - /** - * The default Statement Pattern batch insert size is 1000. - */ - private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000; - - /** - * The maximum number of binding sets that will be inserted into each Statement - * Pattern's result set per Fluo transaction. - */ - private final int spInsertBatchSize; - - /** - * Constructs an instance of {@link CreatePcj} that uses - * {@link #DEFAULT_SP_INSERT_BATCH_SIZE} as the default batch insert size. - */ - public CreatePcj() { - this(DEFAULT_SP_INSERT_BATCH_SIZE); - } - - /** - * Constructs an instance of {@link CreatePcj}. - * - * @param spInsertBatchSize - The maximum number of binding sets that will be - * inserted into each Statement Pattern's result set per Fluo transaction. - */ - public CreatePcj(final int spInsertBatchSize) { - checkArgument(spInsertBatchSize > 0, "The SP insert batch size '" + spInsertBatchSize + "' must be greater than 0."); - this.spInsertBatchSize = spInsertBatchSize; - } - - - /** - * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method does not - * require a pcjId and does not require a PCJ table to have already been created via {@link PrecomputedJoinStorage}. - * This method only adds the metadata to the Fluo table to incrementally generate query results. Since there - * is no PCJ table, the incremental results must be exported to some external queuing service such as Kafka. - * This method currently only supports SPARQL COSNTRUCT queries, as they only export to Kafka by default. - * - * @param sparql - SPARQL query whose results will be updated in the Fluo table - * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @return The metadata that was written to the Fluo application for the PCJ. - * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. - * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. - * @throws RuntimeException If SPARQL query is not a CONSTRUCT query. - */ - public FluoQuery createFluoPcj(final FluoClient fluo, String sparql) throws MalformedQueryException, PcjException { - requireNonNull(sparql); - requireNonNull(fluo); - - // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo. - // We use these IDs later when scanning Rya for historic Statement Pattern matches - // as well as setting up automatic exports. - final NodeIds nodeIds = new NodeIds(); - final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); - final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); - checkArgument(fluoQuery.getConstructQueryMetadata().isPresent(), "Sparql query: " + sparql + " must begin with a construct."); - - try (Transaction tx = fluo.newTransaction()) { - // Write the query's structure to Fluo. - new FluoQueryMetadataDAO().write(tx, fluoQuery); - tx.commit(); - } - - return fluoQuery; - } - - - - - /** - * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method - * creates the FluoQuery (metadata) inside of Fluo so that results can be incrementally generated - * inside of Fluo. This method assumes that the user will export the results to Kafka or - * some other external resource. The export id is equivalent to the queryId that is returned, - * which is in contrast to the other createPcj methods in this class which accept an external pcjId - * that is used to identify the Accumulo table or Kafka topic for exporting results. - * - * @param sparql - sparql query String to be registered with Fluo - * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @return queryId - The id of the root of the query metadata tree in Fluo - * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. - * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. - */ - public String createPcj(String sparql, FluoClient fluo) throws MalformedQueryException { - Preconditions.checkNotNull(sparql); - Preconditions.checkNotNull(fluo); - - FluoQuery fluoQuery = makeFluoQuery(sparql); - String queryId = null; - if(fluoQuery.getQueryMetadata().isPresent()) { - queryId = fluoQuery.getQueryMetadata().get().getNodeId(); - queryId = queryId.split(IncrementalUpdateConstants.QUERY_PREFIX)[1]; - } else { - queryId = fluoQuery.getConstructQueryMetadata().get().getNodeId(); - queryId = queryId.split(IncrementalUpdateConstants.CONSTRUCT_PREFIX)[1]; - } - - String[] idArray = queryId.split("_"); - String id = idArray[idArray.length - 1]; - - writeFluoQuery(fluo, fluoQuery, id); - return id; - } - - /** - * Tells the Fluo PCJ Updater application to maintain a new PCJ. This method provides - * no guarantees that a PCJ with the given pcjId exists outside of Fluo. This method merely - * creates the FluoQuery (metadata) inside of Fluo so that results and be incrementally generated - * inside of Fluo. This method assumes that the user will export the results to Kafka or - * some other external resource. - * - * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) - * @param sparql - sparql query String to be registered with Fluo - * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @return The metadata that was written to the Fluo application for the PCJ. - * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. - * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. - */ - public FluoQuery createPcj( - final String pcjId, - final String sparql, - final FluoClient fluo) throws MalformedQueryException, PcjException { - requireNonNull(pcjId); - requireNonNull(sparql); - requireNonNull(fluo); - - FluoQuery fluoQuery = makeFluoQuery(sparql); - writeFluoQuery(fluo, fluoQuery, pcjId); - - return fluoQuery; - } - - private FluoQuery makeFluoQuery(String sparql) throws MalformedQueryException { - - // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo. - // We use these IDs later when scanning Rya for historic Statement Pattern matches - // as well as setting up automatic exports. - final NodeIds nodeIds = new NodeIds(); - - // Parse the query's structure for the metadata that will be written to fluo. - final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); - return new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); - } - - private void writeFluoQuery(FluoClient fluo, FluoQuery fluoQuery, String pcjId) { - try (Transaction tx = fluo.newTransaction()) { - // Write the query's structure to Fluo. - new FluoQueryMetadataDAO().write(tx, fluoQuery); - - // The results of the query are eventually exported to an instance - // of Rya, so store the Rya ID for the PCJ. - QueryMetadata metadata = fluoQuery.getQueryMetadata().orNull(); - if (metadata != null) { - String queryId = metadata.getNodeId(); - tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); - tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); - } - - // Flush the changes to Fluo. - tx.commit(); - } - } - - - /** - * Tells the Fluo PCJ Updater application to maintain a new PCJ. The method takes in an - * instance of {@link PrecomputedJoinStorage} to verify that a PCJ with the given pcjId exists. - * - * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) - * @param pcjStorage - Provides access to the PCJ index. (not null) - * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @return The metadata that was written to the Fluo application for the PCJ. - * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. - * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. - */ - public FluoQuery createPcj( - final String pcjId, - final PrecomputedJoinStorage pcjStorage, - final FluoClient fluo) throws MalformedQueryException, PcjException { - requireNonNull(pcjId); - requireNonNull(pcjStorage); - requireNonNull(fluo); - - // Parse the query's structure for the metadata that will be written to fluo. - final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); - final String sparql = pcjMetadata.getSparql(); - return createPcj(pcjId, sparql, fluo); - } - - /** - * Tells the Fluo PCJ Updater application to maintain a new PCJ. - * <p> - * This call scans Rya for Statement Pattern matches and inserts them into - * the Fluo application. This method does not verify that a PcjTable with the - * the given pcjId actually exists. It is assumed that results for any query registered - * using this method will be exported to Kafka or some other external service. - * - * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) - * @param sparql - sparql query that will registered with Fluo. (not null) - * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @param queryEngine - QueryEngine for a given Rya Instance, (not null) - * @return The Fluo application's Query ID of the query that was created. - * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. - * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. - * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. - */ - public String withRyaIntegration( - final String pcjId, - final String sparql, - final FluoClient fluo, - final Connector accumulo, - final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException { - requireNonNull(pcjId); - requireNonNull(sparql); - requireNonNull(fluo); - requireNonNull(accumulo); - requireNonNull(ryaInstance); - - - // Write the SPARQL query's structure to the Fluo Application. - final FluoQuery fluoQuery = createPcj(pcjId, sparql, fluo); - //import results already ingested into Rya that match query - importHistoricResultsIntoFluo(fluo, fluoQuery, accumulo, ryaInstance); - // return queryId to the caller for later monitoring from the export. - return fluoQuery.getQueryMetadata().get().getNodeId(); - } - - - /** - * Tells the Fluo PCJ Updater application to maintain a new PCJ. - * <p> - * This call scans Rya for Statement Pattern matches and inserts them into - * the Fluo application. The Fluo application will then maintain the intermediate - * results as new triples are inserted and export any new query results to the - * {@code pcjId} within the provided {@code pcjStorage}. This method requires that a - * PCJ table already exist for the query corresponding to the pcjId. Results will be exported - * to this table. - * - * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) - * @param pcjStorage - Provides access to the PCJ index. (not null) - * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @param queryEngine - QueryEngine for a given Rya Instance, (not null) - * @return The Fluo application's Query ID of the query that was created. - * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. - * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. - * @throws RyaDAOException Historic PCJ results could not be loaded because of a problem with {@code rya}. - */ - public String withRyaIntegration( - final String pcjId, - final PrecomputedJoinStorage pcjStorage, - final FluoClient fluo, - final Connector accumulo, - final String ryaInstance ) throws MalformedQueryException, PcjException, RyaDAOException { - requireNonNull(pcjId); - requireNonNull(pcjStorage); - requireNonNull(fluo); - requireNonNull(accumulo); - requireNonNull(ryaInstance); - - // Parse the query's structure for the metadata that will be written to fluo. - final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); - final String sparql = pcjMetadata.getSparql(); - - return withRyaIntegration(pcjId, sparql, fluo, accumulo, ryaInstance); - } - - private void importHistoricResultsIntoFluo(FluoClient fluo, FluoQuery fluoQuery, Connector accumulo, String ryaInstance) - throws RyaDAOException { - // Reuse the same set object while performing batch inserts. - final Set<RyaStatement> queryBatch = new HashSet<>(); - - // Iterate through each of the statement patterns and insert their - // historic matches into Fluo. - for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) { - // Get an iterator over all of the binding sets that match the - // statement pattern. - final StatementPattern pattern = FluoStringConverter.toStatementPattern(patternMetadata.getStatementPattern()); - queryBatch.add(spToRyaStatement(pattern)); - } - - // Create AccumuloRyaQueryEngine to query for historic results - final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix(ryaInstance); - conf.setAuths(getAuths(accumulo)); - - try (final AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf); - CloseableIterable<RyaStatement> queryIterable = queryEngine.query(new BatchRyaQuery(queryBatch))) { - final Set<RyaStatement> triplesBatch = new HashSet<>(); - - // Insert batches of the binding sets into Fluo. - for (final RyaStatement ryaStatement : queryIterable) { - if (triplesBatch.size() == spInsertBatchSize) { - writeBatch(fluo, triplesBatch); - triplesBatch.clear(); - } - - triplesBatch.add(ryaStatement); - } - - if (!triplesBatch.isEmpty()) { - writeBatch(fluo, triplesBatch); - triplesBatch.clear(); - } - } catch (final IOException e) { - log.warn("Ignoring IOException thrown while closing the AccumuloRyaQueryEngine used by CreatePCJ.", e); - } - } - - private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) { - checkNotNull(fluo); - checkNotNull(batch); - new InsertTriples().insert(fluo, batch); - } - - private static RyaStatement spToRyaStatement(final StatementPattern sp) { - final Value subjVal = sp.getSubjectVar().getValue(); - final Value predVal = sp.getPredicateVar().getValue(); - final Value objVal = sp.getObjectVar().getValue(); - - RyaURI subjURI = null; - RyaURI predURI = null; - RyaType objType = null; - - if(subjVal != null) { - if(!(subjVal instanceof Resource)) { - throw new AssertionError("Subject must be a Resource."); - } - subjURI = RdfToRyaConversions.convertResource((Resource) subjVal); - } - - if (predVal != null) { - if(!(predVal instanceof URI)) { - throw new AssertionError("Predicate must be a URI."); - } - predURI = RdfToRyaConversions.convertURI((URI) predVal); - } - - if (objVal != null ) { - objType = RdfToRyaConversions.convertValue(objVal); - } - - return new RyaStatement(subjURI, predURI, objType); - } - - private String[] getAuths(final Connector accumulo) { - Authorizations auths; - try { - auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami()); - final List<byte[]> authList = auths.getAuthorizations(); - final String[] authArray = new String[authList.size()]; - for(int i = 0; i < authList.size(); i++){ - authArray[i] = new String(authList.get(i), "UTF-8"); - } - return authArray; - } catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) { - throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami()); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java new file mode 100644 index 0000000..58a52fb --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeleteFluoPcj.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.api; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.client.scanner.CellScanner; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumnValue; +import org.apache.fluo.api.data.Span; +import org.apache.rya.indexing.pcj.fluo.app.NodeType; +import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; +import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.ProjectionMetadata; +import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; +import org.openrdf.query.BindingSet; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Deletes a Pre-computed Join (PCJ) from Fluo. + * <p> + * This is a two phase process. + * <ol> + * <li>Delete metadata about each node of the query using a single Fluo + * transaction. This prevents new {@link BindingSet}s from being created when + * new triples are inserted.</li> + * <li>Delete BindingSets associated with each node of the query. This is done + * in a batch fashion to guard against large delete transactions that don't fit + * into memory.</li> + * </ol> + */ +@DefaultAnnotation(NonNull.class) +public class DeleteFluoPcj { + + private final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); + private final int batchSize; + + /** + * Constructs an instance of {@link DeleteFluoPcj}. + * + * @param batchSize - The number of entries that will be deleted at a time. (> 0) + */ + public DeleteFluoPcj(final int batchSize) { + checkArgument(batchSize > 0); + this.batchSize = batchSize; + } + + /** + * Deletes all metadata and {@link BindingSet}s associated with a Rya + * Precomputed Join Index from the Fluo application that is incrementally + * updating it. + * + * @param client - Connects to the Fluo application that is updating the PCJ + * Index. (not null) + * @param pcjId - The PCJ ID for the query that will removed from the Fluo + * application. (not null) + */ + public void deletePcj(final FluoClient client, final String pcjId) { + requireNonNull(client); + requireNonNull(pcjId); + + final Transaction tx = client.newTransaction(); + + // Delete the query's metadata. This halts input. + final List<String> nodeIds = getNodeIds(tx, pcjId); + deleteMetadata(tx, nodeIds, pcjId); + + // Delete the binding sets associated with the query's nodes. + for (final String nodeId : nodeIds) { + deleteData(client, nodeId); + } + } + + /** + * This method retrieves all of the nodeIds that are part of the query with + * specified pcjId. + * + * @param tx - Transaction of a given Fluo table. (not null) + * @param pcjId - Id of query. (not null) + * @return list of Node IDs associated with the query {@code pcjId}. + */ + private List<String> getNodeIds(Transaction tx, String pcjId) { + requireNonNull(tx); + requireNonNull(pcjId); + + // Get the ID that tracks the query within the Fluo application. + final String queryId = getQueryIdFromPcjId(tx, pcjId); + + // Get the query's children nodes. + final List<String> nodeIds = new ArrayList<>(); + nodeIds.add(queryId); + getChildNodeIds(tx, queryId, nodeIds); + return nodeIds; + } + + /** + * Recursively navigate query tree to extract all of the nodeIds. + * + * @param tx - Transaction of a given Fluo table. (not null) + * @param nodeId - Current node in query tree. (not null) + * @param nodeIds - The Node IDs extracted from query tree. (not null) + */ + private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) { + requireNonNull(tx); + requireNonNull(nodeId); + requireNonNull(nodeIds); + + final NodeType type = NodeType.fromNodeId(nodeId).get(); + switch (type) { + case QUERY: + final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId); + final String queryChild = queryMeta.getChildNodeId(); + nodeIds.add(queryChild); + getChildNodeIds(tx, queryChild, nodeIds); + break; + case CONSTRUCT: + final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId); + final String constructChild = constructMeta.getChildNodeId(); + nodeIds.add(constructChild); + getChildNodeIds(tx, constructChild, nodeIds); + break; + case JOIN: + final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId); + final String lchild = joinMeta.getLeftChildNodeId(); + final String rchild = joinMeta.getRightChildNodeId(); + nodeIds.add(lchild); + nodeIds.add(rchild); + getChildNodeIds(tx, lchild, nodeIds); + getChildNodeIds(tx, rchild, nodeIds); + break; + case FILTER: + final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId); + final String filterChild = filterMeta.getChildNodeId(); + nodeIds.add(filterChild); + getChildNodeIds(tx, filterChild, nodeIds); + break; + case AGGREGATION: + final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId); + final String aggChild = aggMeta.getChildNodeId(); + nodeIds.add(aggChild); + getChildNodeIds(tx, aggChild, nodeIds); + break; + case PERIODIC_QUERY: + final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId); + final String periodicChild = periodicMeta.getChildNodeId(); + nodeIds.add(periodicChild); + getChildNodeIds(tx, periodicChild, nodeIds); + break; + case PROJECTION: + final ProjectionMetadata projectionMetadata = dao.readProjectionMetadata(tx, nodeId); + final String projectionChild = projectionMetadata.getChildNodeId(); + nodeIds.add(projectionChild); + getChildNodeIds(tx, projectionChild, nodeIds); + break; + case STATEMENT_PATTERN: + break; + } + } + + /** + * Deletes metadata for all nodeIds associated with a given queryId in a + * single transaction. Prevents additional BindingSets from being created as + * new triples are added. + * + * @param tx - Transaction of a given Fluo table. (not null) + * @param nodeIds - Nodes whose metatdata will be deleted. (not null) + * @param pcjId - The PCJ ID of the query whose will be deleted. (not null) + */ + private void deleteMetadata(final Transaction tx, final List<String> nodeIds, final String pcjId) { + requireNonNull(tx); + requireNonNull(nodeIds); + requireNonNull(pcjId); + + try (final Transaction typeTx = tx) { + deletePcjIdAndSparqlMetadata(typeTx, pcjId); + + for (final String nodeId : nodeIds) { + final NodeType type = NodeType.fromNodeId(nodeId).get(); + deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns()); + } + typeTx.commit(); + } + } + + /** + * Deletes all metadata for a Query Node. + * + * @param tx - Transaction the deletes will be performed with. (not null) + * @param nodeId - The Node ID of the query node to delete. (not null) + * @param columns - The columns that will be deleted. (not null) + */ + private void deleteMetadataColumns(final Transaction tx, final String nodeId, final List<Column> columns) { + requireNonNull(tx); + requireNonNull(columns); + requireNonNull(nodeId); + + final Bytes row = Bytes.of(nodeId); + for (final Column column : columns) { + tx.delete(row, column); + } + } + + /** + * Deletes high level query meta for converting from queryId to pcjId and + * vice versa, as well as converting from sparql to queryId. + * + * @param tx - Transaction the deletes will be performed with. (not null) + * @param pcjId - The PCJ whose metadata will be deleted. (not null) + */ + private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) { + requireNonNull(tx); + requireNonNull(pcjId); + + final String queryId = getQueryIdFromPcjId(tx, pcjId); + final String sparql = getSparqlFromQueryId(tx, queryId); + tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID); + tx.delete(sparql, FluoQueryColumns.QUERY_ID); + tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID); + } + + /** + * Deletes all results (BindingSets or Statements) associated with the specified nodeId. + * + * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null) + * @param client - Used to delete the data. (not null) + */ + private void deleteData(final FluoClient client, final String nodeId) { + requireNonNull(client); + requireNonNull(nodeId); + + final NodeType type = NodeType.fromNodeId(nodeId).get(); + Transaction tx = client.newTransaction(); + while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) { + tx = client.newTransaction(); + } + } + + private CellScanner getIterator(final Transaction tx, final String nodeId, final Column column) { + requireNonNull(tx); + requireNonNull(nodeId); + requireNonNull(column); + + return tx.scanner().fetch(column).over(Span.prefix(nodeId)).build(); + } + + private boolean deleteDataBatch(final Transaction tx, final CellScanner scanner, final Column column) { + requireNonNull(tx); + requireNonNull(scanner); + requireNonNull(column); + + try (Transaction ntx = tx) { + int count = 0; + final Iterator<RowColumnValue> iter = scanner.iterator(); + while (iter.hasNext() && count < batchSize) { + final Bytes row = iter.next().getRow(); + count++; + tx.delete(row, column); + } + + final boolean hasNext = iter.hasNext(); + tx.commit(); + return hasNext; + } + } + + private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) { + requireNonNull(tx); + requireNonNull(pcjId); + + final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID); + return queryIdBytes.toString(); + } + + private String getSparqlFromQueryId(final Transaction tx, final String queryId) { + requireNonNull(tx); + requireNonNull(queryId); + + final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId); + return metadata.getSparql(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java deleted file mode 100644 index 3052c1d..0000000 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/DeletePcj.java +++ /dev/null @@ -1,305 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.indexing.pcj.fluo.api; - -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - -import java.util.ArrayList; -import java.util.Iterator; -import java.util.List; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Transaction; -import org.apache.fluo.api.client.scanner.CellScanner; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumnValue; -import org.apache.fluo.api.data.Span; -import org.apache.rya.indexing.pcj.fluo.app.NodeType; -import org.apache.rya.indexing.pcj.fluo.app.query.AggregationMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryMetadataDAO; -import org.apache.rya.indexing.pcj.fluo.app.query.JoinMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.PeriodicQueryMetadata; -import org.apache.rya.indexing.pcj.fluo.app.query.QueryMetadata; -import org.openrdf.query.BindingSet; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - -/** - * Deletes a Pre-computed Join (PCJ) from Fluo. - * <p> - * This is a two phase process. - * <ol> - * <li>Delete metadata about each node of the query using a single Fluo - * transaction. This prevents new {@link BindingSet}s from being created when - * new triples are inserted.</li> - * <li>Delete BindingSets associated with each node of the query. This is done - * in a batch fashion to guard against large delete transactions that don't fit - * into memory.</li> - * </ol> - */ -@DefaultAnnotation(NonNull.class) -public class DeletePcj { - - private final FluoQueryMetadataDAO dao = new FluoQueryMetadataDAO(); - private final int batchSize; - - /** - * Constructs an instance of {@link DeletePcj}. - * - * @param batchSize - The number of entries that will be deleted at a time. (> 0) - */ - public DeletePcj(final int batchSize) { - checkArgument(batchSize > 0); - this.batchSize = batchSize; - } - - /** - * Deletes all metadata and {@link BindingSet}s associated with a Rya - * Precomputed Join Index from the Fluo application that is incrementally - * updating it. - * - * @param client - Connects to the Fluo application that is updating the PCJ - * Index. (not null) - * @param pcjId - The PCJ ID for the query that will removed from the Fluo - * application. (not null) - */ - public void deletePcj(final FluoClient client, final String pcjId) { - requireNonNull(client); - requireNonNull(pcjId); - - final Transaction tx = client.newTransaction(); - - // Delete the query's metadata. This halts input. - final List<String> nodeIds = getNodeIds(tx, pcjId); - deleteMetadata(tx, nodeIds, pcjId); - - // Delete the binding sets associated with the query's nodes. - for (final String nodeId : nodeIds) { - deleteData(client, nodeId); - } - } - - /** - * This method retrieves all of the nodeIds that are part of the query with - * specified pcjId. - * - * @param tx - Transaction of a given Fluo table. (not null) - * @param pcjId - Id of query. (not null) - * @return list of Node IDs associated with the query {@code pcjId}. - */ - private List<String> getNodeIds(Transaction tx, String pcjId) { - requireNonNull(tx); - requireNonNull(pcjId); - - // Get the ID that tracks the query within the Fluo application. - final String queryId = getQueryIdFromPcjId(tx, pcjId); - - // Get the query's children nodes. - final List<String> nodeIds = new ArrayList<>(); - nodeIds.add(queryId); - getChildNodeIds(tx, queryId, nodeIds); - return nodeIds; - } - - /** - * Recursively navigate query tree to extract all of the nodeIds. - * - * @param tx - Transaction of a given Fluo table. (not null) - * @param nodeId - Current node in query tree. (not null) - * @param nodeIds - The Node IDs extracted from query tree. (not null) - */ - private void getChildNodeIds(final Transaction tx, final String nodeId, final List<String> nodeIds) { - requireNonNull(tx); - requireNonNull(nodeId); - requireNonNull(nodeIds); - - final NodeType type = NodeType.fromNodeId(nodeId).get(); - switch (type) { - case QUERY: - final QueryMetadata queryMeta = dao.readQueryMetadata(tx, nodeId); - final String queryChild = queryMeta.getChildNodeId(); - nodeIds.add(queryChild); - getChildNodeIds(tx, queryChild, nodeIds); - break; - case CONSTRUCT: - final ConstructQueryMetadata constructMeta = dao.readConstructQueryMetadata(tx, nodeId); - final String constructChild = constructMeta.getChildNodeId(); - nodeIds.add(constructChild); - getChildNodeIds(tx, constructChild, nodeIds); - break; - case JOIN: - final JoinMetadata joinMeta = dao.readJoinMetadata(tx, nodeId); - final String lchild = joinMeta.getLeftChildNodeId(); - final String rchild = joinMeta.getRightChildNodeId(); - nodeIds.add(lchild); - nodeIds.add(rchild); - getChildNodeIds(tx, lchild, nodeIds); - getChildNodeIds(tx, rchild, nodeIds); - break; - case FILTER: - final FilterMetadata filterMeta = dao.readFilterMetadata(tx, nodeId); - final String filterChild = filterMeta.getChildNodeId(); - nodeIds.add(filterChild); - getChildNodeIds(tx, filterChild, nodeIds); - break; - case AGGREGATION: - final AggregationMetadata aggMeta = dao.readAggregationMetadata(tx, nodeId); - final String aggChild = aggMeta.getChildNodeId(); - nodeIds.add(aggChild); - getChildNodeIds(tx, aggChild, nodeIds); - break; - case PERIODIC_QUERY: - final PeriodicQueryMetadata periodicMeta = dao.readPeriodicQueryMetadata(tx, nodeId); - final String periodicChild = periodicMeta.getChildNodeId(); - nodeIds.add(periodicChild); - getChildNodeIds(tx, periodicChild, nodeIds); - break; - case STATEMENT_PATTERN: - break; - } - } - - /** - * Deletes metadata for all nodeIds associated with a given queryId in a - * single transaction. Prevents additional BindingSets from being created as - * new triples are added. - * - * @param tx - Transaction of a given Fluo table. (not null) - * @param nodeIds - Nodes whose metatdata will be deleted. (not null) - * @param pcjId - The PCJ ID of the query whose will be deleted. (not null) - */ - private void deleteMetadata(final Transaction tx, final List<String> nodeIds, final String pcjId) { - requireNonNull(tx); - requireNonNull(nodeIds); - requireNonNull(pcjId); - - try (final Transaction typeTx = tx) { - deletePcjIdAndSparqlMetadata(typeTx, pcjId); - - for (final String nodeId : nodeIds) { - final NodeType type = NodeType.fromNodeId(nodeId).get(); - deleteMetadataColumns(typeTx, nodeId, type.getMetaDataColumns()); - } - typeTx.commit(); - } - } - - /** - * Deletes all metadata for a Query Node. - * - * @param tx - Transaction the deletes will be performed with. (not null) - * @param nodeId - The Node ID of the query node to delete. (not null) - * @param columns - The columns that will be deleted. (not null) - */ - private void deleteMetadataColumns(final Transaction tx, final String nodeId, final List<Column> columns) { - requireNonNull(tx); - requireNonNull(columns); - requireNonNull(nodeId); - - final Bytes row = Bytes.of(nodeId); - for (final Column column : columns) { - tx.delete(row, column); - } - } - - /** - * Deletes high level query meta for converting from queryId to pcjId and - * vice versa, as well as converting from sparql to queryId. - * - * @param tx - Transaction the deletes will be performed with. (not null) - * @param pcjId - The PCJ whose metadata will be deleted. (not null) - */ - private void deletePcjIdAndSparqlMetadata(final Transaction tx, final String pcjId) { - requireNonNull(tx); - requireNonNull(pcjId); - - final String queryId = getQueryIdFromPcjId(tx, pcjId); - final String sparql = getSparqlFromQueryId(tx, queryId); - tx.delete(queryId, FluoQueryColumns.RYA_PCJ_ID); - tx.delete(sparql, FluoQueryColumns.QUERY_ID); - tx.delete(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID); - } - - /** - * Deletes all results (BindingSets or Statements) associated with the specified nodeId. - * - * @param nodeId - nodeId whose {@link BindingSet}s will be deleted. (not null) - * @param client - Used to delete the data. (not null) - */ - private void deleteData(final FluoClient client, final String nodeId) { - requireNonNull(client); - requireNonNull(nodeId); - - final NodeType type = NodeType.fromNodeId(nodeId).get(); - Transaction tx = client.newTransaction(); - while (deleteDataBatch(tx, getIterator(tx, nodeId, type.getResultColumn()), type.getResultColumn())) { - tx = client.newTransaction(); - } - } - - private CellScanner getIterator(final Transaction tx, final String nodeId, final Column column) { - requireNonNull(tx); - requireNonNull(nodeId); - requireNonNull(column); - - return tx.scanner().fetch(column).over(Span.prefix(nodeId)).build(); - } - - private boolean deleteDataBatch(final Transaction tx, final CellScanner scanner, final Column column) { - requireNonNull(tx); - requireNonNull(scanner); - requireNonNull(column); - - try (Transaction ntx = tx) { - int count = 0; - final Iterator<RowColumnValue> iter = scanner.iterator(); - while (iter.hasNext() && count < batchSize) { - final Bytes row = iter.next().getRow(); - count++; - tx.delete(row, column); - } - - final boolean hasNext = iter.hasNext(); - tx.commit(); - return hasNext; - } - } - - private String getQueryIdFromPcjId(final Transaction tx, final String pcjId) { - requireNonNull(tx); - requireNonNull(pcjId); - - final Bytes queryIdBytes = tx.get(Bytes.of(pcjId), FluoQueryColumns.PCJ_ID_QUERY_ID); - return queryIdBytes.toString(); - } - - private String getSparqlFromQueryId(final Transaction tx, final String queryId) { - requireNonNull(tx); - requireNonNull(queryId); - - final QueryMetadata metadata = dao.readQueryMetadata(tx, queryId); - return metadata.getSparql(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java index 6c1aa01..76b62d8 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructProjection.java @@ -19,12 +19,8 @@ package org.apache.rya.indexing.pcj.fluo.app; * under the License. */ import java.io.UnsupportedEncodingException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.UUID; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java index d8d60b5..6642780 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/ConstructQueryResultUpdater.java @@ -23,12 +23,13 @@ import org.apache.fluo.api.client.TransactionBase; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; import org.apache.log4j.Logger; -import org.apache.rya.api.domain.RyaSchema; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaSubGraph; import org.apache.rya.indexing.pcj.fluo.app.export.kafka.RyaSubGraphKafkaSerDe; import org.apache.rya.indexing.pcj.fluo.app.query.ConstructQueryMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; +import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; /** @@ -53,39 +54,26 @@ public class ConstructQueryResultUpdater { public void updateConstructQueryResults(TransactionBase tx, VisibilityBindingSet bs, ConstructQueryMetadata metadata) { String nodeId = metadata.getNodeId(); + VariableOrder varOrder = metadata.getVariableOrder(); Column column = FluoQueryColumns.CONSTRUCT_STATEMENTS; ConstructGraph graph = metadata.getConstructGraph(); + String parentId = metadata.getParentNodeId(); - try { + // Create the Row Key for the emitted binding set. It does not contain visibilities. + final Bytes resultRow = RowKeyUtil.makeRowKey(nodeId, varOrder, bs); + + // If this is a new binding set, then emit it. + if(tx.get(resultRow, column) == null || varOrder.getVariableOrders().size() < bs.size()) { Set<RyaStatement> statements = graph.createGraphFromBindingSet(bs); - RyaSubGraph subgraph = new RyaSubGraph(metadata.getNodeId(), statements); - String resultId = nodeId + "_" + getSubGraphId(subgraph); - tx.set(Bytes.of(resultId), column, Bytes.of(serializer.toBytes(subgraph))); - } catch (Exception e) { - log.trace("Unable to serialize RyaStatement generated by ConstructGraph: " + graph + " from BindingSet: " + bs ); - } - } - - /** - * Generates a simple hash used as an id for the subgraph. Id generated as hash as opposed - * to UUID to avoid the same subgraph result being stored under multiple UUID. - * @param subgraph - subgraph that an id is need for - * @return - hash of subgraph used as an id - */ - private int getSubGraphId(RyaSubGraph subgraph) { - int id = 17; - id = 31*id + subgraph.getId().hashCode(); - for(RyaStatement statement: subgraph.getStatements()) { - int statementId = 7; - if(!statement.getSubject().getData().startsWith(RyaSchema.BNODE_NAMESPACE)) { - statementId = 17*statementId + statement.getSubject().hashCode(); - } - statementId = 17*statementId + statement.getPredicate().hashCode(); - statementId = 17*statementId + statement.getObject().hashCode(); - id += statementId; + RyaSubGraph subgraph = new RyaSubGraph(parentId, statements); + final Bytes nodeValueBytes = Bytes.of(serializer.toBytes(subgraph)); + + log.trace( + "Transaction ID: " + tx.getStartTimestamp() + "\n" + + "New Binding Set: " + subgraph + "\n"); + + tx.set(resultRow, column, nodeValueBytes); } - return Math.abs(id); } - } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java index 1c99051..7cfa216 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FilterResultUpdater.java @@ -25,7 +25,6 @@ import org.apache.fluo.api.data.Bytes; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.fluo.app.query.FilterMetadata; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; -import org.apache.rya.indexing.pcj.fluo.app.util.BindingSetUtil; import org.apache.rya.indexing.pcj.fluo.app.util.FilterSerializer; import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; @@ -46,8 +45,6 @@ import org.openrdf.query.algebra.evaluation.ValueExprEvaluationException; import org.openrdf.query.algebra.evaluation.impl.EvaluationStrategyImpl; import org.openrdf.query.algebra.evaluation.util.QueryEvaluationUtil; -import com.google.common.base.Optional; - import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; import info.aduna.iteration.CloseableIteration; @@ -114,24 +111,16 @@ public class FilterResultUpdater { // Evaluate whether the child BindingSet satisfies the filter's condition. final ValueExpr condition = filter.getCondition(); if (isTrue(condition, childBindingSet)) { - // Create the Filter's binding set from the child's. - final VariableOrder filterVarOrder = filterMetadata.getVariableOrder(); - final BindingSet filterBindingSet = BindingSetUtil.keepBindings(filterVarOrder, childBindingSet); // Create the Row Key for the emitted binding set. It does not contain visibilities. - final Bytes resultRow = RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, filterBindingSet); - - // If this is a new binding set, then emit it. - if(tx.get(resultRow, FluoQueryColumns.FILTER_BINDING_SET) == null) { - final VisibilityBindingSet visBindingSet = new VisibilityBindingSet(filterBindingSet, childBindingSet.getVisibility()); - final Bytes nodeValueBytes = BS_SERDE.serialize(visBindingSet); + final VariableOrder filterVarOrder = filterMetadata.getVariableOrder(); + final Bytes resultRow = RowKeyUtil.makeRowKey(filterMetadata.getNodeId(), filterVarOrder, childBindingSet); - log.trace( - "Transaction ID: " + tx.getStartTimestamp() + "\n" + - "New Binding Set: " + visBindingSet + "\n"); + // Serialize and emit BindingSet + final Bytes nodeValueBytes = BS_SERDE.serialize(childBindingSet); + log.trace("Transaction ID: " + tx.getStartTimestamp() + "\n" + "New Binding Set: " + childBindingSet + "\n"); - tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, nodeValueBytes); - } + tx.set(resultRow, FluoQueryColumns.FILTER_BINDING_SET, nodeValueBytes); } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/e387818b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java index 05a8d1c..43a36de 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java +++ b/extras/rya.pcj.fluo/pcj.fluo.app/src/main/java/org/apache/rya/indexing/pcj/fluo/app/FluoStringConverter.java @@ -23,8 +23,6 @@ import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.DE import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.TYPE_DELIM; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.URI_TYPE; -import java.util.UUID; - import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull;
