Repository: incubator-rya Updated Branches: refs/heads/master 22bdc7a2e -> 78f958d2f
Closes #125; RYA-222-Fixed Column Visibility Bug for Results Streamed into Fluo Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/78f958d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/78f958d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/78f958d2 Branch: refs/heads/master Commit: 78f958d2ff31e2c45fa58b2ea7b226c5598e576e Parents: 22bdc7a Author: Caleb Meier <caleb.me...@parsons.com> Authored: Wed Nov 30 09:07:33 2016 -0800 Committer: pujav65 <puja...@gmail.com> Committed: Thu Dec 15 14:43:59 2016 -0500 ---------------------------------------------------------------------- .../api/client/accumulo/AccumuloCreatePCJ.java | 62 ++--- .../indexing/external/fluo/FluoPcjUpdater.java | 18 +- .../external/fluo/FluoPcjUpdaterSupplier.java | 16 +- .../benchmark/query/PCJOptimizerBenchmark.java | 18 ++ .../rya/indexing/pcj/fluo/api/CreatePcj.java | 256 +++++++++++-------- .../indexing/pcj/fluo/api/InsertTriples.java | 39 +++ .../fluo/client/command/NewQueryCommand.java | 15 +- .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 6 +- .../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 11 +- .../indexing/pcj/fluo/api/GetQueryReportIT.java | 5 +- .../pcj/fluo/integration/CreateDeleteIT.java | 2 +- .../indexing/pcj/fluo/integration/InputIT.java | 11 +- .../indexing/pcj/fluo/integration/QueryIT.java | 11 +- .../pcj/fluo/integration/RyaExportIT.java | 5 +- .../RyaInputIncrementalUpdateIT.java | 14 +- .../pcj/fluo/integration/StreamingTestIT.java | 2 +- .../HistoricStreamingVisibilityIT.java | 130 ++++++++++ .../pcj/fluo/visibility/PcjVisibilityIT.java | 21 +- 18 files changed, 422 insertions(+), 220 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java index 80ece33..ac8da66 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java @@ -18,27 +18,10 @@ */ package org.apache.rya.api.client.accumulo; -import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Objects.requireNonNull; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.accumulo.core.client.Connector; -import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.repository.RepositoryException; -import org.openrdf.sail.SailException; - -import com.google.common.base.Optional; - import org.apache.fluo.api.client.FluoClient; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.accumulo.AccumuloRyaDAO; import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; import org.apache.rya.api.client.CreatePCJ; import org.apache.rya.api.client.GetInstanceDetails; @@ -54,8 +37,20 @@ import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryExce import org.apache.rya.api.instance.RyaDetailsUpdater; import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; -import org.apache.rya.rdftriplestore.RdfCloudTripleStore; -import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * An Accumulo implementation of the {@link CreatePCJ} command. @@ -108,7 +103,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); try { updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId); - } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { + } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) { throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); } @@ -138,7 +133,7 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { return pcjId; } - private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException { + private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException { requireNonNull(pcjStorage); requireNonNull(pcjId); @@ -151,32 +146,9 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { cd.getZookeepers(), fluoAppName); - // Setup the Rya client that is able to talk to scan Rya's statements. - final RyaSailRepository ryaSailRepo = makeRyaRepository(getConnector(), ryaInstance); - // Initialize the PCJ within the Fluo application. final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj(); - fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaSailRepo); + fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance); } - private static RyaSailRepository makeRyaRepository(final Connector connector, final String ryaInstance) throws RepositoryException { - checkNotNull(connector); - checkNotNull(ryaInstance); - - // Setup Rya configuration values. - final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); - ryaConf.setTablePrefix( ryaInstance ); - - // Connect to the Rya repo using the provided Connector. - final AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO(); - accumuloRyaDao.setConnector(connector); - accumuloRyaDao.setConf(ryaConf); - - final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); - ryaStore.setRyaDAO(accumuloRyaDao); - - final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); - ryaRepo.initialize(); - return ryaRepo; - } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java b/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java index 39a3ca2..0e496ca 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdater.java @@ -22,17 +22,14 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.util.Collection; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - +import org.apache.fluo.api.client.FluoClient; import org.apache.log4j.Logger; +import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; -import com.google.common.base.Optional; - -import org.apache.fluo.api.client.FluoClient; -import org.apache.rya.api.domain.RyaStatement; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Updates the PCJ indices by forwarding the statement additions/removals to @@ -47,24 +44,21 @@ public class FluoPcjUpdater implements PrecomputedJoinUpdater { private final FluoClient fluoClient; private final InsertTriples insertTriples = new InsertTriples(); - private final String statementVis; /** * Constructs an instance of {@link FluoPcjUpdater}. * * @param fluoClient - A connection to the Fluo table new statements will be * inserted into and deleted from. (not null) - * @param statementVis - The visibility label that will be applied to all * statements that are inserted via the Fluo PCJ updater. (not null) */ - public FluoPcjUpdater(final FluoClient fluoClient, final String statementVis) { + public FluoPcjUpdater(final FluoClient fluoClient) { this.fluoClient = checkNotNull(fluoClient); - this.statementVis = checkNotNull(statementVis); } @Override public void addStatements(final Collection<RyaStatement> statements) throws PcjUpdateException { - insertTriples.insert(fluoClient, statements, Optional.of(statementVis)); + insertTriples.insert(fluoClient, statements); } @Override http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java b/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java index 44a4b4a..7d2c181 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/external/fluo/FluoPcjUpdaterSupplier.java @@ -25,23 +25,21 @@ import static org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMUL import static org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_USERNAME; import static org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.ACCUMULO_ZOOKEEPERS; import static org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.FLUO_APP_NAME; -import static org.apache.rya.indexing.external.fluo.FluoPcjUpdaterConfig.STATEMENT_VISIBILITY; + import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.client.FluoFactory; import org.apache.fluo.api.config.FluoConfiguration; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - +import org.apache.hadoop.conf.Configuration; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; - -import org.apache.hadoop.conf.Configuration; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; import com.google.common.base.Optional; import com.google.common.base.Supplier; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * Creates instances of {@link FluoPcjUpdater} using the values found in a {@link Configuration}. */ @@ -81,7 +79,6 @@ public class FluoPcjUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> checkArgument(fluoUpdaterConfig.getAccumuloInstance().isPresent(), "Missing configuration: " + ACCUMULO_INSTANCE); checkArgument(fluoUpdaterConfig.getAccumuloUsername().isPresent(), "Missing configuration: " + ACCUMULO_USERNAME); checkArgument(fluoUpdaterConfig.getAccumuloPassword().isPresent(), "Missing configuration: " + ACCUMULO_PASSWORD); - checkArgument(fluoUpdaterConfig.getStatementVisibility().isPresent(), "Missing configuration: " + STATEMENT_VISIBILITY); // Fluo configuration values. final FluoConfiguration fluoClientConfig = new FluoConfiguration(); @@ -95,7 +92,6 @@ public class FluoPcjUpdaterSupplier implements Supplier<PrecomputedJoinUpdater> fluoClientConfig.setAccumuloPassword( fluoUpdaterConfig.getAccumuloPassword().get() ); final FluoClient fluoClient = FluoFactory.newClient(fluoClientConfig); - final String statementVisibilities = fluoUpdaterConfig.getStatementVisibility().get(); - return new FluoPcjUpdater(fluoClient, statementVisibilities); + return new FluoPcjUpdater(fluoClient); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java index 38abf87..fd74e8b 100644 --- a/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java +++ b/extras/rya.benchmark/src/main/java/org/apache/rya/benchmark/query/PCJOptimizerBenchmark.java @@ -16,6 +16,24 @@ * specific language governing permissions and limitations * under the License. */ +/** +ll * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.rya.benchmark.query; import static com.google.common.base.Preconditions.checkArgument; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java index 1259a01..6567371 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java @@ -21,14 +21,27 @@ package org.apache.rya.indexing.pcj.fluo.api; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static java.util.Objects.requireNonNull; -import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; +import java.io.UnsupportedEncodingException; import java.util.HashSet; +import java.util.Iterator; +import java.util.List; import java.util.Set; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.fluo.api.client.FluoClient; +import org.apache.fluo.api.client.Transaction; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.api.persist.query.BatchRyaQuery; +import org.apache.rya.api.resolver.RdfToRyaConversions; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQueryColumns; @@ -39,23 +52,18 @@ import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; -import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; -import org.openrdf.query.Binding; -import org.openrdf.query.BindingSet; +import org.openrdf.model.Resource; +import org.openrdf.model.URI; +import org.openrdf.model.Value; import org.openrdf.query.MalformedQueryException; import org.openrdf.query.QueryEvaluationException; import org.openrdf.query.algebra.StatementPattern; -import org.openrdf.query.impl.MapBindingSet; import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; -import org.openrdf.repository.sail.SailRepository; -import org.openrdf.sail.SailConnection; import org.openrdf.sail.SailException; -import info.aduna.iteration.CloseableIteration; -import org.apache.fluo.api.client.FluoClient; -import org.apache.fluo.api.client.Transaction; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. @@ -105,6 +113,7 @@ public class CreatePcj { this.spInsertBatchSize = spInsertBatchSize; } + /** * Tells the Fluo PCJ Updater application to maintain a new PCJ. * <p> @@ -116,110 +125,145 @@ public class CreatePcj { * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) * @param pcjStorage - Provides access to the PCJ index. (not null) * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) - * @param rya - A connection to the Rya instance hosting the PCJ, (not null) + * @param queryEngine - QueryEngine for a given Rya Instance, (not null) * * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. * @throws SailException Historic PCJ results could not be loaded because of a problem with {@code rya}. * @throws QueryEvaluationException Historic PCJ results could not be loaded because of a problem with {@code rya}. */ - public void withRyaIntegration( - final String pcjId, - final PrecomputedJoinStorage pcjStorage, - final FluoClient fluo, - final SailRepository rya) - throws MalformedQueryException, PcjException, SailException, QueryEvaluationException { - requireNonNull(pcjId); - requireNonNull(pcjStorage); - requireNonNull(fluo); - requireNonNull(rya); - - // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo. - // We use these IDs later when scanning Rya for historic Statement Pattern matches - // as well as setting up automatic exports. - final NodeIds nodeIds = new NodeIds(); - - // Parse the query's structure for the metadata that will be written to fluo. - final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); - final String sparql = pcjMetadata.getSparql(); - final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); - final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); - - try(Transaction tx = fluo.newTransaction()) { - // Write the query's structure to Fluo. - new FluoQueryMetadataDAO().write(tx, fluoQuery); - - // The results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ. - final String queryId = fluoQuery.getQueryMetadata().getNodeId(); - tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); - tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); - - // Flush the changes to Fluo. - tx.commit(); - } - - // Get a connection to Rya. It's used to scan for Statement Pattern results. - final SailConnection ryaConn = rya.getSail().getConnection(); - - // Reuse the same set object while performing batch inserts. - final Set<BindingSet> batch = new HashSet<>(); - - // Iterate through each of the statement patterns and insert their historic matches into Fluo. - for(final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) { - // Get an iterator over all of the binding sets that match the statement pattern. - final StatementPattern pattern = FluoStringConverter.toStatementPattern( patternMetadata.getStatementPattern() ); - final CloseableIteration<? extends BindingSet, QueryEvaluationException> bindingSets = ryaConn.evaluate(pattern, null, null, false); - - // Insert batches of the binding sets into Fluo. - while(bindingSets.hasNext()) { - if(batch.size() == spInsertBatchSize) { - writeBatch(fluo, patternMetadata, batch); - batch.clear(); - } - - batch.add( bindingSets.next() ); - } - - if(!batch.isEmpty()) { - writeBatch(fluo, patternMetadata, batch); - batch.clear(); - } - } - } + public void withRyaIntegration(final String pcjId, final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, + final Connector accumulo, String ryaInstance ) + throws MalformedQueryException, PcjException, SailException, QueryEvaluationException, RyaDAOException { + requireNonNull(pcjId); + requireNonNull(pcjStorage); + requireNonNull(fluo); + requireNonNull(accumulo); + requireNonNull(ryaInstance); + + //Create AccumuloRyaQueryEngine to query for historic results + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(ryaInstance); + conf.setAuths(getAuths(accumulo)); + AccumuloRyaQueryEngine queryEngine = new AccumuloRyaQueryEngine(accumulo, conf); + - /** - * Writes a batch of {@link BindingSet}s that match a statement pattern to Fluo. - * - * @param fluo - Creates transactions to Fluo. (not null) - * @param spMetadata - The Statement Pattern the batch matches. (not null) - * @param batch - A set of binding sets that are the result of the statement pattern. (not null) - */ - private static void writeBatch(final FluoClient fluo, final StatementPatternMetadata spMetadata, final Set<BindingSet> batch) { - checkNotNull(fluo); - checkNotNull(spMetadata); - checkNotNull(batch); + // Keeps track of the IDs that are assigned to each of the query's nodes + // in Fluo. + // We use these IDs later when scanning Rya for historic Statement + // Pattern matches + // as well as setting up automatic exports. + final NodeIds nodeIds = new NodeIds(); + + // Parse the query's structure for the metadata that will be written to + // fluo. + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = pcjMetadata.getSparql(); + final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); + final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); + + try (Transaction tx = fluo.newTransaction()) { + // Write the query's structure to Fluo. + new FluoQueryMetadataDAO().write(tx, fluoQuery); + + // The results of the query are eventually exported to an instance + // of Rya, so store the Rya ID for the PCJ. + final String queryId = fluoQuery.getQueryMetadata().getNodeId(); + tx.set(queryId, FluoQueryColumns.RYA_PCJ_ID, pcjId); + tx.set(pcjId, FluoQueryColumns.PCJ_ID_QUERY_ID, queryId); - final BindingSetStringConverter converter = new BindingSetStringConverter(); + // Flush the changes to Fluo. + tx.commit(); + } - try(Transaction tx = fluo.newTransaction()) { - // Get the node's variable order. - final String spNodeId = spMetadata.getNodeId(); - final VariableOrder varOrder = spMetadata.getVariableOrder(); + // Reuse the same set object while performing batch inserts. + final Set<RyaStatement> queryBatch = new HashSet<>(); - for(final BindingSet bindingSet : batch) { - final MapBindingSet spBindingSet = new MapBindingSet(); - for(final String var : varOrder) { - final Binding binding = bindingSet.getBinding(var); - spBindingSet.addBinding(binding); - } + // Iterate through each of the statement patterns and insert their + // historic matches into Fluo. + for (final StatementPatternMetadata patternMetadata : fluoQuery.getStatementPatternMetadata()) { + // Get an iterator over all of the binding sets that match the + // statement pattern. + final StatementPattern pattern = FluoStringConverter + .toStatementPattern(patternMetadata.getStatementPattern()); + queryBatch.add(spToRyaStatement(pattern)); + } - final String bindingSetStr = converter.convert(spBindingSet, varOrder); + Iterator<RyaStatement> triples = queryEngine.query(new BatchRyaQuery(queryBatch)).iterator(); + Set<RyaStatement> triplesBatch = new HashSet<>(); - // Write the binding set entry to Fluo for the statement pattern. - tx.set(spNodeId + NODEID_BS_DELIM + bindingSetStr, FluoQueryColumns.STATEMENT_PATTERN_BINDING_SET, bindingSetStr); - } + // Insert batches of the binding sets into Fluo. + while (triples.hasNext()) { + if (triplesBatch.size() == spInsertBatchSize) { + writeBatch(fluo, triplesBatch); + triplesBatch.clear(); + } - tx.commit(); - } + triplesBatch.add(triples.next()); + } + + if (!triplesBatch.isEmpty()) { + writeBatch(fluo, triplesBatch); + triplesBatch.clear(); + } + } + + + private static void writeBatch(final FluoClient fluo, final Set<RyaStatement> batch) { + checkNotNull(fluo); + checkNotNull(batch); + + new InsertTriples().insert(fluo, batch); + + } + + + private static RyaStatement spToRyaStatement(StatementPattern sp) { + + Value subjVal = sp.getSubjectVar().getValue(); + Value predVal = sp.getPredicateVar().getValue(); + Value objVal = sp.getObjectVar().getValue(); + + RyaURI subjURI = null; + RyaURI predURI = null; + RyaType objType = null; + + if(subjVal != null) { + if(!(subjVal instanceof Resource)) { + throw new AssertionError("Subject must be a Resource."); + } + subjURI = RdfToRyaConversions.convertResource((Resource) subjVal); + } + + if (predVal != null) { + if(!(predVal instanceof URI)) { + throw new AssertionError("Predicate must be a URI."); + } + predURI = RdfToRyaConversions.convertURI((URI) predVal); + } + + if (objVal != null ) { + objType = RdfToRyaConversions.convertValue(objVal); + } + + return new RyaStatement(subjURI, predURI, objType); } + + + private String[] getAuths(Connector accumulo) { + Authorizations auths; + try { + auths = accumulo.securityOperations().getUserAuthorizations(accumulo.whoami()); + List<byte[]> authList = auths.getAuthorizations(); + String[] authArray = new String[authList.size()]; + for(int i = 0; i < authList.size(); i++){ + authArray[i] = new String(authList.get(i), "UTF-8"); + } + return authArray; + } catch (AccumuloException | AccumuloSecurityException | UnsupportedEncodingException e) { + throw new RuntimeException("Cannot read authorizations for user: " + accumulo.whoami()); + } + } + + } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java index 9312523..1e86836 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/InsertTriples.java @@ -88,6 +88,45 @@ public class InsertTriples { tx.commit(); } } + + /** + * Inserts a triple into Fluo. + * + * @param fluo - A connection to the Fluo table that will be updated. (not null) + * @param triple - The RyaStatement to insert. (not null) + */ + public void insert(final FluoClient fluo, final RyaStatement triple) { + checkNotNull(fluo); + checkNotNull(triple); + + insert(fluo, Collections.singleton(triple)); + } + + /** + * Insert a batch of RyaStatements into Fluo. + * + * @param fluo - A connection to the Fluo table that will be updated. (not null) + * @param triples - The triples to insert. (not null) + */ + public void insert(final FluoClient fluo, final Collection<RyaStatement> triples) { + checkNotNull(fluo); + checkNotNull(triples); + + try(Transaction tx = fluo.newTransaction()) { + for(final RyaStatement triple : triples) { + Optional<byte[]> visibility = Optional.fromNullable(triple.getColumnVisibility()); + try { + tx.set(Bytes.of(spoFormat(triple)), FluoQueryColumns.TRIPLES, Bytes.of(visibility.or(new byte[0]))); + } catch (final TripleRowResolverException e) { + log.error("Could not convert a Triple into the SPO format: " + triple); + } + } + + tx.commit(); + } + } + + /** * Converts a triple into a byte[] holding the Rya SPO representation of it. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java index e612a07..43dac3c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java +++ b/extras/rya.pcj.fluo/pcj.fluo.client/src/main/java/org/apache/rya/indexing/pcj/fluo/client/command/NewQueryCommand.java @@ -21,17 +21,26 @@ package org.apache.rya.indexing.pcj.fluo.client.command; import static com.google.common.base.Preconditions.checkNotNull; import java.io.IOException; +import java.io.UnsupportedEncodingException; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.List; +import java.util.stream.Collectors; import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.security.Authorizations; import org.apache.commons.io.IOUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; +import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.client.PcjAdminClientCommand; import org.apache.rya.indexing.pcj.fluo.client.util.ParsedQueryRequest; @@ -124,12 +133,14 @@ public class NewQueryCommand implements PcjAdminClientCommand { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo PCJ Updater app to maintain the PCJ. - createPcj.withRyaIntegration(pcjId, pcjStorage, fluo, rya); + createPcj.withRyaIntegration(pcjId, pcjStorage, fluo, accumulo, ryaTablePrefix); - } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { + } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) { throw new ExecutionException("Could not create and load historic matches into the the Fluo app for the query.", e); } log.trace("Finished executing the New Query Command."); } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java index 09f2854..105f697 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java +++ b/extras/rya.pcj.fluo/pcj.fluo.demo/src/main/java/org/apache/rya/indexing/pcj/fluo/demo/FluoAndHistoricPcjsDemo.java @@ -47,9 +47,11 @@ import com.google.common.collect.Sets; import org.apache.fluo.api.client.FluoClient; import org.apache.fluo.api.mini.MiniFluo; +import org.apache.rya.accumulo.query.AccumuloRyaQueryEngine; import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.api.domain.RyaType; import org.apache.rya.api.domain.RyaURI; +import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.api.resolver.RyaToRdfConversions; import org.apache.rya.rdftriplestore.RyaSailRepository; @@ -177,9 +179,9 @@ public class FluoAndHistoricPcjsDemo implements Demo { pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain it. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, ryaTablePrefix); - } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { + } catch (MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) { throw new DemoExecutionException("Error while using Fluo to compute and export historic matches, so the demo can not continue. Exiting.", e); } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java index 94d974d..82b61bd 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetPcjMetadataIT.java @@ -26,6 +26,7 @@ import java.util.Set; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInAccumuloException; import org.apache.rya.indexing.pcj.fluo.api.GetPcjMetadata.NotInFluoException; @@ -49,7 +50,7 @@ import com.google.common.collect.Sets; public class GetPcjMetadataIT extends ITBase { @Test - public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException { + public void getMetadataByQueryId() throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, RyaDAOException { final String sparql = "SELECT ?x " + "WHERE { " + @@ -62,7 +63,7 @@ public class GetPcjMetadataIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Fetch the PCJ's Metadata through the GetPcjMetadata interactor. final String queryId = new ListQueryIds().listQueryIds(fluoClient).get(0); @@ -75,7 +76,7 @@ public class GetPcjMetadataIT extends ITBase { } @Test - public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException { + public void getAllMetadata() throws MalformedQueryException, SailException, QueryEvaluationException, PcjException, NotInFluoException, NotInAccumuloException, AccumuloException, AccumuloSecurityException, RyaDAOException { final CreatePcj createPcj = new CreatePcj(); @@ -89,7 +90,7 @@ public class GetPcjMetadataIT extends ITBase { "?x <http://worksAt> <http://Chipotle>." + "}"; final String q1PcjId = pcjStorage.createPcj(q1Sparql); - createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, ryaRepo); + createPcj.withRyaIntegration(q1PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); final String q2Sparql = "SELECT ?x ?y " + @@ -98,7 +99,7 @@ public class GetPcjMetadataIT extends ITBase { "?y <http://worksAt> <http://Chipotle>." + "}"; final String q2PcjId = pcjStorage.createPcj(q2Sparql); - createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, ryaRepo); + createPcj.withRyaIntegration(q2PcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Ensure the command returns the correct metadata. final Set<PcjMetadata> expected = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java index 0fe44bd..85c31a0 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/api/GetQueryReportIT.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; +import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.GetQueryReport.QueryReport; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; @@ -38,8 +39,6 @@ import org.junit.Test; import com.google.common.base.Optional; import com.google.common.collect.Sets; -import org.apache.rya.api.domain.RyaStatement; - /** * Integration tests the methods of {@link GetQueryReportl}. */ @@ -79,7 +78,7 @@ public class GetQueryReportIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index d2ff98c..b4c8d69 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -84,7 +84,7 @@ public class CreateDeleteIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Verify the end results of the query match the expected results. fluo.waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java index d0f6b21..dcab997 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/InputIT.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import java.util.HashSet; import java.util.Set; +import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; @@ -38,8 +39,6 @@ import org.openrdf.query.impl.BindingImpl; import com.google.common.base.Optional; import com.google.common.collect.Sets; -import org.apache.rya.api.domain.RyaStatement; - /** * Performs integration tests over the Fluo application geared towards various types of input. * <p> @@ -90,7 +89,7 @@ public class InputIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Verify the end results of the query match the expected results. fluo.waitForObservers(); @@ -137,7 +136,7 @@ public class InputIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Ensure the query has no results yet. fluo.waitForObservers(); @@ -189,7 +188,7 @@ public class InputIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Ensure Alice is a match. fluo.waitForObservers(); @@ -254,7 +253,7 @@ public class InputIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Ensure Alice is a match. fluo.waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 57b679a..4e9f265 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import java.util.HashSet; import java.util.Set; +import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; @@ -38,8 +39,6 @@ import org.openrdf.query.impl.BindingImpl; import com.google.common.base.Optional; import com.google.common.collect.Sets; -import org.apache.rya.api.domain.RyaStatement; - /** * Performs integration tests over the Fluo application geared towards various query structures. * <p> @@ -84,7 +83,7 @@ public class QueryIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); @@ -170,7 +169,7 @@ public class QueryIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); @@ -235,7 +234,7 @@ public class QueryIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); @@ -283,7 +282,7 @@ public class QueryIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java index 07b4640..f3f486c 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaExportIT.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertEquals; import java.util.HashSet; import java.util.Set; +import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; @@ -36,8 +37,6 @@ import org.openrdf.query.impl.BindingImpl; import com.google.common.base.Optional; import com.google.common.collect.Sets; -import org.apache.rya.api.domain.RyaStatement; - /** * Performs integration tests over the Fluo application geared towards Rya PCJ exporting. * <p> @@ -99,7 +98,7 @@ public class RyaExportIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Stream the data into Fluo. new InsertTriples().insert(fluoClient, streamedTriples, Optional.<String>absent()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java index 9056c99..fd70a19 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/RyaInputIncrementalUpdateIT.java @@ -23,9 +23,11 @@ import static org.junit.Assert.assertEquals; import java.util.HashSet; import java.util.Set; +import org.apache.fluo.api.client.FluoClient; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.indexing.external.PrecomputedJoinIndexer; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; -import org.apache.rya.indexing.pcj.fluo.app.IncUpdateDAO; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.update.PrecomputedJoinUpdater; @@ -38,10 +40,6 @@ import org.openrdf.repository.RepositoryConnection; import com.google.common.collect.Sets; -import org.apache.fluo.api.client.FluoClient; -import org.apache.rya.accumulo.AccumuloRyaDAO; -import org.apache.rya.indexing.external.PrecomputedJoinIndexer; - /** * This test ensures that the correct updates are pushed by Fluo @@ -91,7 +89,7 @@ public class RyaInputIncrementalUpdateIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Verify the end results of the query match the expected results. fluo.waitForObservers(); @@ -142,7 +140,7 @@ public class RyaInputIncrementalUpdateIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); fluo.waitForObservers(); @@ -191,7 +189,7 @@ public class RyaInputIncrementalUpdateIT extends ITBase { final String pcjId = pcjStorage.createPcj(sparql); // Tell the Fluo app to maintain the PCJ. - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); fluo.waitForObservers(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java index 2573925..29ef8f7 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/StreamingTestIT.java @@ -98,7 +98,7 @@ public class StreamingTestIT extends ITBase { // Create the PCJ table. final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); final String pcjId = pcjStorage.createPcj(pcj); - new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); String tableName = RYA_INSTANCE_NAME + "INDEX_" + pcjId; return tableName; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java new file mode 100644 index 0000000..6f4596f --- /dev/null +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/HistoricStreamingVisibilityIT.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.fluo.visibility; + +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.accumulo.AccumuloRyaDAO; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RdfToRyaConversions; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.pcj.fluo.ITBase; +import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.junit.Assert; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.BindingImpl; + +import com.google.common.collect.Sets; + +/** + * Performs integration tests over the Fluo application geared towards various types of input. + * <p> + * These tests are being ignore so that they will not run as unit tests while building the application. + */ +public class HistoricStreamingVisibilityIT extends ITBase { + + /** + * Ensure historic matches are included in the result. + */ + @Test + public void historicResults() throws Exception { + // A query that finds people who talk to Eve and work at Chipotle. + final String sparql = + "SELECT ?x " + + "WHERE { " + + "?x <http://talksTo> <http://Eve>. " + + "?x <http://worksAt> <http://Chipotle>." + + "}"; + + accumuloConn.securityOperations().changeUserAuthorizations(ACCUMULO_USER, new Authorizations("U","V","W")); + AccumuloRyaDAO dao = new AccumuloRyaDAO(); + dao.setConnector(accumuloConn); + dao.setConf(makeConfig()); + dao.init(); + + // Triples that are loaded into Rya before the PCJ is created. + final Set<RyaStatement> historicTriples = Sets.newHashSet( + makeRyaStatement(makeStatement("http://Alice", "http://talksTo", "http://Eve"),"U"), + makeRyaStatement(makeStatement("http://Bob", "http://talksTo", "http://Eve"),"V"), + makeRyaStatement(makeStatement("http://Charlie", "http://talksTo", "http://Eve"),"W"), + + makeRyaStatement(makeStatement("http://Eve", "http://helps", "http://Kevin"), "U"), + + makeRyaStatement(makeStatement("http://Bob", "http://worksAt", "http://Chipotle"), "W"), + makeRyaStatement(makeStatement("http://Charlie", "http://worksAt", "http://Chipotle"), "V"), + makeRyaStatement(makeStatement("http://Eve", "http://worksAt", "http://Chipotle"), "U"), + makeRyaStatement(makeStatement("http://David", "http://worksAt", "http://Chipotle"), "V")); + + dao.add(historicTriples.iterator()); + dao.flush(); + + // The expected results of the SPARQL query once the PCJ has been computed. + final Set<BindingSet> expected = new HashSet<>(); + expected.add(makeBindingSet( + new BindingImpl("x", new URIImpl("http://Bob")))); + expected.add(makeBindingSet( + new BindingImpl("x", new URIImpl("http://Charlie")))); + + // Create the PCJ table. + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); + final String pcjId = pcjStorage.createPcj(sparql); + + new CreatePcj().withRyaIntegration(pcjId, pcjStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); + + // Verify the end results of the query match the expected results. + fluo.waitForObservers(); + Set<BindingSet> results = Sets.newHashSet(pcjStorage.listResults(pcjId)); + Assert.assertEquals(expected, results); + } + + + private AccumuloRdfConfiguration makeConfig() { + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(RYA_INSTANCE_NAME); + // Accumulo connection information. + conf.set(ConfigUtils.CLOUDBASE_USER, ACCUMULO_USER); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, ACCUMULO_PASSWORD); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, instanceName); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, zookeepers); + conf.set(RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH, "U,V,W"); + + return conf; + } + + + private static RyaStatement makeRyaStatement(Statement statement, String visibility) throws UnsupportedEncodingException { + + RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement); + ryaStatement.setColumnVisibility(visibility.getBytes("UTF-8")); + return ryaStatement; + + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/78f958d2/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index e799ddd..ccc2c20 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -38,12 +38,21 @@ import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; import org.apache.hadoop.io.Text; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.RdfCloudTripleStoreConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.pcj.fluo.ITBase; import org.apache.rya.indexing.pcj.fluo.api.CreatePcj; import org.apache.rya.indexing.pcj.fluo.api.InsertTriples; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.apache.rya.sail.config.RyaSailFactory; import org.junit.Test; import org.openrdf.model.URI; import org.openrdf.model.ValueFactory; @@ -57,15 +66,6 @@ import org.openrdf.sail.Sail; import com.beust.jcommander.internal.Sets; import com.google.common.base.Optional; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.api.client.RyaClient; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; -import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; -import org.apache.rya.api.domain.RyaStatement; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.rdftriplestore.RyaSailRepository; -import org.apache.rya.sail.config.RyaSailFactory; - /** * Integration tests that ensure the Fluo Application properly exports PCJ * results with the correct Visibility values. @@ -110,6 +110,7 @@ public class PcjVisibilityIT extends ITBase { // PCJ updating application will have to maintain visibilities. final AccumuloRdfConfiguration ryaConf = super.makeConfig(instanceName, zookeepers); ryaConf.set(ConfigUtils.CLOUDBASE_AUTHS, "u"); + ryaConf.set(RdfCloudTripleStoreConfiguration.CONF_CV, "u"); Sail sail = null; RyaSailRepository ryaRepo = null; @@ -195,7 +196,7 @@ public class PcjVisibilityIT extends ITBase { final String pcjId = rootStorage.createPcj(sparql); // Create the PCJ in Fluo. - new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, ryaRepo); + new CreatePcj().withRyaIntegration(pcjId, rootStorage, fluoClient, accumuloConn, RYA_INSTANCE_NAME); // Stream the data into Fluo. for(final RyaStatement statement : streamedTriples.keySet()) {