http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java new file mode 100644 index 0000000..74368ef --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; + +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import mvm.rya.api.instance.RyaDetailsUpdater; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; + +/** + * An Accumulo backed implementation of {@link PrecomputedJoinStorage}. + */ +@ParametersAreNonnullByDefault +public class AccumuloPcjStorage implements PrecomputedJoinStorage { + + // Factories that are used to create new PCJs. + private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); + private final PcjTableNameFactory pcjTableNameFactory = new PcjTableNameFactory(); + private final PcjVarOrderFactory pcjVarOrderFactory = new ShiftVarOrderFactory(); + + // Objects used to interact with the PCJ tables associated with an instance of Rya. + private final Connector accumuloConn; + private final String ryaInstanceName; + private final PcjTables pcjTables = new PcjTables(); + + // Used to update the instance's metadata. + private final RyaDetailsRepository ryaDetailsRepo; + + /** + * Constructs an instance of {@link AccumuloPcjStorage}. + * + * @param accumuloConn - The connector that will be used to connect to Accumulo. (not null) + * @param ryaInstanceName - The name of the RYA instance that will be accessed. (not null) + */ + public AccumuloPcjStorage(final Connector accumuloConn, final String ryaInstanceName) { + this.accumuloConn = requireNonNull(accumuloConn); + this.ryaInstanceName = requireNonNull(ryaInstanceName); + ryaDetailsRepo = new AccumuloRyaInstanceDetailsRepository(accumuloConn, ryaInstanceName); + } + + @Override + public List<String> listPcjs() throws PCJStorageException { + try { + final RyaDetails details = ryaDetailsRepo.getRyaInstanceDetails(); + final PCJIndexDetails pcjIndexDetails = details.getPCJIndexDetails(); + final List<String> pcjIds = new ArrayList<>( pcjIndexDetails.getPCJDetails().keySet() ); + return pcjIds; + } catch (final RyaDetailsRepositoryException e) { + throw new PCJStorageException("Could not check to see if RyaDetails exist for the instance.", e); + } + } + + @Override + public String createPcj(final String sparql) throws PCJStorageException { + requireNonNull(sparql); + + // Create the variable orders that will be used within Accumulo to store the PCJ. + final Set<VariableOrder> varOrders; + try { + varOrders = pcjVarOrderFactory.makeVarOrders(sparql); + } catch (final MalformedQueryException e) { + throw new PCJStorageException("Can not create the PCJ. The SPARQL is malformed.", e); + } + + // Update the Rya Details for this instance to include the new PCJ table. + final String pcjId = pcjIdFactory.nextId(); + try { + new RyaDetailsUpdater(ryaDetailsRepo).update( + new RyaDetailsMutator() { + @Override + public RyaDetails mutate(final RyaDetails originalDetails) { + // Create the new PCJ's details. + final PCJDetails.Builder newPcjDetails = PCJDetails.builder().setId( pcjId ); + + // Add them to the instance's details. + final RyaDetails.Builder mutated = RyaDetails.builder(originalDetails); + mutated.getPCJIndexDetails().addPCJDetails( newPcjDetails ); + return mutated.build(); + } + }); + } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new PCJStorageException(String.format("Could not create a new PCJ for Rya instance '%s' " + + "because of a problem while updating the instance's details.", ryaInstanceName), e); + } + + // Create the table that will hold the PCJ's results. + final String pcjTableName = pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId); + pcjTables.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + return pcjId; + } + + @Override + public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException { + requireNonNull(pcjId); + final String pcjTableName = pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId); + return pcjTables.getPcjMetadata(accumuloConn, pcjTableName); + } + + @Override + public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) throws PCJStorageException { + requireNonNull(pcjId); + requireNonNull(results); + final String pcjTableName = pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId); + pcjTables.addResults(accumuloConn, pcjTableName, results); + } + + @Override + public Iterable<BindingSet> listResults(final String pcjId) throws PCJStorageException { + requireNonNull(pcjId); + + try { + // Fetch my authorizations. + final String myUsername = accumuloConn.whoami(); + final Authorizations myAuths = accumuloConn.securityOperations().getUserAuthorizations( myUsername ); + + // Scan the PCJ table. + final String pcjTableName = pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId); + return pcjTables.listResults(accumuloConn, pcjTableName, myAuths); + + } catch (AccumuloException | AccumuloSecurityException e) { + throw new PCJStorageException("Could not list the results because I can not look up my Authorizations.", e); + } + } + + @Override + public void purge(final String pcjId) throws PCJStorageException { + requireNonNull(pcjId); + final String pcjTableName = pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId); + pcjTables.purgePcjTable(accumuloConn, pcjTableName); + } + + @Override + public void dropPcj(final String pcjId) throws PCJStorageException { + requireNonNull(pcjId); + + // Update the Rya Details for this instance to no longer include the PCJ. + try { + new RyaDetailsUpdater(ryaDetailsRepo).update( + new RyaDetailsMutator() { + @Override + public RyaDetails mutate(final RyaDetails originalDetails) { + // Drop the PCJ's metadata from the instance's metadata. + final RyaDetails.Builder mutated = RyaDetails.builder(originalDetails); + mutated.getPCJIndexDetails().removePCJDetails(pcjId); + return mutated.build(); + } + }); + } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new PCJStorageException(String.format("Could not drop an existing PCJ for Rya instance '%s' " + + "because of a problem while updating the instance's details.", ryaInstanceName), e); + } + + // Delete the table that hold's the PCJ's results. + final String pcjTableName = pcjTableNameFactory.makeTableName(ryaInstanceName, pcjId); + pcjTables.dropPcjTable(accumuloConn, pcjTableName); + } + + @Override + public void close() throws PCJStorageException { + // Accumulo Connectors don't require closing. + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java index 248f724..588792b 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactory.java @@ -20,12 +20,13 @@ package org.apache.rya.indexing.pcj.storage.accumulo; import static java.util.Objects.requireNonNull; -import java.util.UUID; +import javax.annotation.ParametersAreNonnullByDefault; /** * Creates Accumulo table names that may be recognized by Rya as a table that * holds the results of a Precomputed Join. */ +@ParametersAreNonnullByDefault public class PcjTableNameFactory { /** @@ -46,30 +47,20 @@ public class PcjTableNameFactory { * query that is being precomputed. Here's an example of what a table name * may look like: * <pre> - * demo_INDEX_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9 + * demo_INDEX_c8f5367c16604210a7cb681ed004d2d9 * </pre> * The "demo_INDEX" portion indicates this table is a PCJ table for the "demo_" - * instance of Rya. The "_QUERY:c8f5367c-1660-4210-a7cb-681ed004d2d9" portion - * could be anything at all that uniquely identifies the query that is being updated. + * instance of Rya. The "c8f5367c16604210a7cb681ed004d2d9" portion could be + * anything at all that uniquely identifies the query that is being updated. * - * @param tablePrefix - The Rya instance's table prefix. (not null) - * @param uniqueId - The unique portion of the Rya PCJ table name. (not null) + * @param ryaInstance - The Rya instance's table prefix. (not null) + * @param pcjId - The ID of the PCJ the table is for. (not null) * @return A Rya PCJ table name built using the provided values. */ - public String makeTableName(final String tablePrefix, final String uniqueId) { - return tablePrefix + "INDEX_" + uniqueId; - } - - /** - * Invokes {@link #makeTableName(String, String)} with a randomly generated - * UUID as the {@code uniqueId}. - * - * @param tablePrefix - The Rya instance's table prefix. (not null) - * @return A Rya PCJ table name built using the provided values. - */ - public String makeTableName(final String tablePrefix) { - final String uniqueId = UUID.randomUUID().toString().replaceAll("-", ""); - return makeTableName(tablePrefix, uniqueId); + public String makeTableName(final String ryaInstance, final String pcjId) { + requireNonNull(ryaInstance); + requireNonNull(pcjId); + return ryaInstance + "INDEX_" + pcjId.toString().replaceAll("-", ""); } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java index 81078fa..c29cd2e 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java @@ -19,6 +19,7 @@ package org.apache.rya.indexing.pcj.storage.accumulo; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import java.util.ArrayList; import java.util.Collection; @@ -299,6 +300,42 @@ public class PcjTables { } /** + * Get an {@link Iterator} over the {@link BindingSet}s that are stored in the PCJ table. + * + * @param accumuloConn - A connection to the Accumulo that hsots the PCJ table. (not null) + * @param pcjTableName - The name of the PCJ table that will be scanned. (not null) + * @param auths - the user's authorizations that will be used to scan the table. (not null) + * @return An iterator over all of the {@link BindingSet}s that are stored as + * results for the PCJ. + * @throws PCJStorageException The binding sets could not be fetched. + */ + public Iterable<BindingSet> listResults(final Connector accumuloConn, final String pcjTableName, final Authorizations auths) throws PCJStorageException { + requireNonNull(pcjTableName); + + // Fetch the Variable Orders for the binding sets and choose one of them. It + // doesn't matter which one we choose because they all result in the same output. + final PcjMetadata metadata = getPcjMetadata(accumuloConn, pcjTableName); + final VariableOrder varOrder = metadata.getVarOrders().iterator().next(); + + try { + // Fetch only the Binding Sets whose Variable Order matches the selected one. + final Scanner scanner = accumuloConn.createScanner(pcjTableName, auths); + scanner.fetchColumnFamily( new Text(varOrder.toString()) ); + + // Return an Iterator that uses that scanner. + return new Iterable<BindingSet>() { + @Override + public Iterator<BindingSet> iterator() { + return new ScannerBindingSetIterator(scanner, varOrder); + } + }; + + } catch (final TableNotFoundException e) { + throw new PCJStorageException(String.format("PCJ Table does not exist for name '%s'.", pcjTableName), e); + } + } + + /** * Add a collection of results to a specific PCJ table. * * @param accumuloConn - A connection to the Accumulo that hosts the PCJ table. (not null) @@ -383,119 +420,91 @@ public class PcjTables { * @param delta - How much the cardinality will change. * @throws PCJStorageException The cardinality could not be updated. */ - private void updateCardinality(final Connector accumuloConn, - final String pcjTableName, final long delta) - throws PCJStorageException { - checkNotNull(accumuloConn); - checkNotNull(pcjTableName); - - ConditionalWriter conditionalWriter = null; - try { - conditionalWriter = accumuloConn.createConditionalWriter( - pcjTableName, new ConditionalWriterConfig()); - - boolean updated = false; - while (!updated) { - // Write the conditional update request to Accumulo. - final long cardinality = getPcjMetadata(accumuloConn, - pcjTableName).getCardinality(); - final ConditionalMutation mutation = makeUpdateCardinalityMutation( - cardinality, delta); - final ConditionalWriter.Result result = conditionalWriter - .write(mutation); - - // Interpret the result. - switch (result.getStatus()) { - case ACCEPTED: - updated = true; - break; - case REJECTED: - break; - case UNKNOWN: - // We do not know if the mutation succeeded. At best, we - // can hope the metadata hasn't been updated - // since we originally fetched it and try again. - // Otherwise, continue forwards as if it worked. It's - // okay if this number is slightly off. - final long newCardinality = getPcjMetadata(accumuloConn, - pcjTableName).getCardinality(); - if (newCardinality != cardinality) { - updated = true; - } - break; - case VIOLATED: - throw new PCJStorageException( - "The cardinality could not be updated because the commit violated a table constraint."); - case INVISIBLE_VISIBILITY: - throw new PCJStorageException( - "The condition contains a visibility the updater can not satisfy."); - } - } - } catch (AccumuloException | AccumuloSecurityException - | TableNotFoundException e) { - throw new PCJStorageException( - "Could not update the cardinality value of the PCJ Table named: " - + pcjTableName, e); - } finally { - if (conditionalWriter != null) { - conditionalWriter.close(); - } - } - - } - + private void updateCardinality(final Connector accumuloConn, final String pcjTableName, final long delta) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + ConditionalWriter conditionalWriter = null; + try { + conditionalWriter = accumuloConn.createConditionalWriter(pcjTableName, new ConditionalWriterConfig()); + + boolean updated = false; + while (!updated) { + // Write the conditional update request to Accumulo. + final long cardinality = getPcjMetadata(accumuloConn,pcjTableName).getCardinality(); + final ConditionalMutation mutation = makeUpdateCardinalityMutation(cardinality, delta); + final ConditionalWriter.Result result = conditionalWriter.write(mutation); + + // Interpret the result. + switch (result.getStatus()) { + case ACCEPTED: + updated = true; + break; + case REJECTED: + break; + case UNKNOWN: + // We do not know if the mutation succeeded. At best, we + // can hope the metadata hasn't been updated + // since we originally fetched it and try again. + // Otherwise, continue forwards as if it worked. It's + // okay if this number is slightly off. + final long newCardinality = getPcjMetadata(accumuloConn,pcjTableName).getCardinality(); + if (newCardinality != cardinality) { + updated = true; + } + break; + case VIOLATED: + throw new PCJStorageException("The cardinality could not be updated because the commit violated a table constraint."); + case INVISIBLE_VISIBILITY: + throw new PCJStorageException("The condition contains a visibility the updater can not satisfy."); + } + } + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + throw new PCJStorageException("Could not update the cardinality value of the PCJ Table named: " + pcjTableName, e); + } finally { + if (conditionalWriter != null) { + conditionalWriter.close(); + } + } + } /** * Update the cardinality of a PCJ by a {@code delta}. * - *This method updates the PCJ table cardinality using a BatchWriter in the event that - *the Accumulo Connector is for a MockInstance. In the event that the cardinality is - *being updated asynchronously, there are no guarantees that the resulting cardinality - *will be correct. + * This method updates the PCJ table cardinality using a BatchWriter in the event that + * the Accumulo Connector is for a MockInstance. In the event that the cardinality is + * being updated asynchronously, there are no guarantees that the resulting cardinality + * will be correct. * * @param accumuloConn - A connection to a Mock Accumulo Instance that hosts the PCJ table. (not null) * @param pcjTableName - The name of the PCJ table that will have its cardinality updated. (not null) * @param delta - How much the cardinality will change. * @throws PCJStorageException The cardinality could not be updated. */ - private void updateMockCardinality(final Connector accumuloConn, - final String pcjTableName, final long delta) - throws PCJStorageException { - checkNotNull(accumuloConn); - checkNotNull(pcjTableName); - - BatchWriter batchWriter = null; - try { - batchWriter = accumuloConn.createBatchWriter(pcjTableName, - new BatchWriterConfig()); - final long cardinality = getPcjMetadata(accumuloConn, pcjTableName) - .getCardinality(); - final Mutation mutation = new Mutation(PCJ_METADATA_ROW_ID); - final Value newCardinality = new Value( - longLexicoder.encode(cardinality + delta)); - mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, - newCardinality); - batchWriter.addMutation(mutation); - } catch (TableNotFoundException | MutationsRejectedException e) { - throw new PCJStorageException( - "Could not update the cardinality value of the PCJ Table named: " - + pcjTableName, e); - } finally { - if (batchWriter != null) { - try { - batchWriter.close(); - } catch (MutationsRejectedException e) { - throw new PCJStorageException( - "Could not update the cardinality value of the PCJ Table named: " - + pcjTableName, e); - } - } - } - } - - + private void updateMockCardinality(final Connector accumuloConn, final String pcjTableName, final long delta) throws PCJStorageException { + checkNotNull(accumuloConn); + checkNotNull(pcjTableName); + BatchWriter batchWriter = null; + try { + batchWriter = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig()); + final long cardinality = getPcjMetadata(accumuloConn, pcjTableName).getCardinality(); + final Mutation mutation = new Mutation(PCJ_METADATA_ROW_ID); + final Value newCardinality = new Value(longLexicoder.encode(cardinality + delta)); + mutation.put(PCJ_METADATA_FAMILY, PCJ_METADATA_CARDINALITY, newCardinality); + batchWriter.addMutation(mutation); + } catch (TableNotFoundException | MutationsRejectedException e) { + throw new PCJStorageException("Could not update the cardinality value of the PCJ Table named: " + pcjTableName, e); + } finally { + if (batchWriter != null) { + try { + batchWriter.close(); + } catch (final MutationsRejectedException e) { + throw new PCJStorageException("Could not update the cardinality value of the PCJ Table named: " + pcjTableName, e); + } + } + } + } /** * Creates a {@link ConditionalMutation} that only updates the cardinality http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java index f806d6e..00b4c99 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjVarOrderFactory.java @@ -20,13 +20,27 @@ package org.apache.rya.indexing.pcj.storage.accumulo; import java.util.Set; +import javax.annotation.ParametersAreNonnullByDefault; + +import org.openrdf.query.MalformedQueryException; + /** * Create alternative variable orders for a SPARQL query based on * the original ordering of its results. */ +@ParametersAreNonnullByDefault public interface PcjVarOrderFactory { /** + * Create a set of variable orders for a SPARQL query. + * + * @param sparql - The SPARQL query the variable orders will be derived from. (not null) + * @return @return A set of variable orders for the SPARQL query. + * @throws MalformedQueryException The SPARQL query was malformed and could not be parsed. + */ + public Set<VariableOrder> makeVarOrders(String sparql) throws MalformedQueryException; + + /** * Create alternative variable orders for a SPARQL query based on * the original ordering of its results. * http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java new file mode 100644 index 0000000..b641070 --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Iterator; +import java.util.Map.Entry; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.openrdf.query.BindingSet; + +/** + * Iterates over the results of a {@link Scanner} assuming the results are + * binding sets that can be converted using a {@link AccumuloPcjSerializer}. + */ +@ParametersAreNonnullByDefault +public class ScannerBindingSetIterator implements Iterator<BindingSet> { + + private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + + private final Iterator<Entry<Key, Value>> accEntries; + private final VariableOrder varOrder; + + /** + * Constructs an instance of {@link ScannerBindingSetIterator}. + * + * @param scanner - The scanner whose results will be iterated over. (not null) + * @param varOrder - The variable order of the binding sets the scanner returns. (not null) + */ + public ScannerBindingSetIterator(final Scanner scanner, final VariableOrder varOrder) { + requireNonNull(scanner); + this.accEntries = scanner.iterator(); + this.varOrder = requireNonNull(varOrder); + } + + @Override + public boolean hasNext() { + return accEntries.hasNext(); + } + + @Override + public BindingSet next() { + final Entry<Key, Value> entry = accEntries.next(); + final byte[] bindingSetBytes = entry.getKey().getRow().getBytes(); + try { + return converter.convert(bindingSetBytes, varOrder); + } catch (final BindingSetConversionException e) { + throw new RuntimeException("Could not deserialize a BindingSet from Accumulo.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java index 1ae21e5..b4ba348 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactory.java @@ -18,12 +18,17 @@ */ package org.apache.rya.indexing.pcj.storage.accumulo; +import static java.util.Objects.requireNonNull; + import java.util.HashSet; import java.util.List; import java.util.Set; import javax.annotation.ParametersAreNonnullByDefault; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.parser.sparql.SPARQLParser; + import com.google.common.collect.Lists; /** @@ -32,6 +37,18 @@ import com.google.common.collect.Lists; */ @ParametersAreNonnullByDefault public class ShiftVarOrderFactory implements PcjVarOrderFactory { + + @Override + public Set<VariableOrder> makeVarOrders(final String sparql) throws MalformedQueryException { + requireNonNull(sparql); + + final Set<String> bindingNames = new SPARQLParser().parseQuery(sparql, null) + .getTupleExpr() + .getBindingNames(); + + return makeVarOrders( new VariableOrder(bindingNames) ); + } + @Override public Set<VariableOrder> makeVarOrders(final VariableOrder varOrder) { final Set<VariableOrder> varOrders = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactoryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactoryTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactoryTest.java new file mode 100644 index 0000000..2ad5ce5 --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTableNameFactoryTest.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +/** + * Tests the methods of {@link PcjTableNameFactory}. + */ +public class PcjTableNameFactoryTest { + + @Test + public void makeTableName() { + final String ryaInstance = "testInstance_"; + final String pcjId = "2dda1b099d264f16b1da8f9409c104d3"; + + // Create the Accumulo PCJ table name. + final PcjTableNameFactory factory = new PcjTableNameFactory(); + final String tableName = factory.makeTableName(ryaInstance, pcjId); + + // Ensure the table name matches the expected name. + assertEquals("testInstance_INDEX_2dda1b099d264f16b1da8f9409c104d3", tableName); + } + + @Test + public void getPcjId() { + final String pcjTableName = "testInstance_INDEX_2dda1b099d264f16b1da8f9409c104d3"; + + // Get the PCJ ID from the table name. + final PcjTableNameFactory factory = new PcjTableNameFactory(); + final String pcjId = factory.getPcjId(pcjTableName); + + // Ensure the pcjId matches the expected id. + assertEquals("2dda1b099d264f16b1da8f9409c104d3", pcjId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java index 68dd3df..bc93a05 100644 --- a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTablesIntegrationTest.java @@ -1,6 +1,4 @@ -package org.apache.rya.indexing.pcj.storage.accumulo; - -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -9,7 +7,7 @@ package org.apache.rya.indexing.pcj.storage.accumulo; * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -18,43 +16,36 @@ package org.apache.rya.indexing.pcj.storage.accumulo; * specific language governing permissions and limitations * under the License. */ +package org.apache.rya.indexing.pcj.storage.accumulo; -import static com.google.common.base.Preconditions.checkNotNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import java.io.File; import java.io.IOException; import java.util.HashSet; import java.util.Map.Entry; import java.util.Set; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.AccumuloRyaDAO; -import mvm.rya.api.RdfCloudTripleStoreConfiguration; -import mvm.rya.rdftriplestore.RdfCloudTripleStore; -import mvm.rya.rdftriplestore.RyaSailRepository; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.ZooKeeperInstance; -import org.apache.accumulo.core.client.security.tokens.PasswordToken; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.hadoop.io.Text; +import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PcjException; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.zookeeper.ClientCnxn; import org.junit.After; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.openrdf.model.Statement; import org.openrdf.model.impl.LiteralImpl; @@ -71,14 +62,19 @@ import com.google.common.base.Optional; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import com.google.common.io.Files; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.MiniAccumuloClusterInstance; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; +import mvm.rya.rdftriplestore.RyaSailRepository; /** * Performs integration test using {@link MiniAccumuloCluster} to ensure the * functions of {@link PcjTables} work within a cluster setting. */ public class PcjTablesIntegrationTest { - private static final Logger log = Logger.getLogger(PcjTablesIntegrationTest.class); private static final String USE_MOCK_INSTANCE = ".useMockInstance"; private static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename"; @@ -89,22 +85,69 @@ public class PcjTablesIntegrationTest { protected static final String RYA_TABLE_PREFIX = "demo_"; + // The MiniAccumuloCluster is re-used between tests. + private MiniAccumuloClusterInstance cluster; + // Rya data store and connections. - protected MiniAccumuloCluster accumulo = null; - protected static Connector accumuloConn = null; protected RyaSailRepository ryaRepo = null; protected RepositoryConnection ryaConn = null; + @BeforeClass + public static void killLoudLogs() { + Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); + } + @Before - public void setupMiniResources() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException, RepositoryException { - // Initialize the Mini Accumulo that will be used to store Triples and get a connection to it. - accumulo = startMiniAccumulo(); + public void resetTestEnvironmanet() throws AccumuloException, AccumuloSecurityException, TableNotFoundException, RepositoryException, IOException, InterruptedException { + // Start the cluster. + cluster = new MiniAccumuloClusterInstance(); + cluster.startMiniAccumulo(); // Setup the Rya library to use the Mini Accumulo. - ryaRepo = setupRya(accumulo); + ryaRepo = setupRya(); ryaConn = ryaRepo.getConnection(); } + @After + public void shutdownMiniCluster() throws IOException, InterruptedException, RepositoryException { + // Stop Rya. + ryaRepo.shutDown(); + + // Stop the cluster. + cluster.stopMiniAccumulo(); + } + + /** + * Format a Mini Accumulo to be a Rya repository. + * + * @return The Rya repository sitting on top of the Mini Accumulo. + */ + private RyaSailRepository setupRya() throws AccumuloException, AccumuloSecurityException, RepositoryException { + // Setup the Rya Repository that will be used to create Repository Connections. + final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); + final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); + crdfdao.setConnector( cluster.getConnector() ); + + // Setup Rya configuration values. + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix(RYA_TABLE_PREFIX); + conf.setDisplayQueryPlan(true); + + conf.setBoolean(USE_MOCK_INSTANCE, false); + conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX); + conf.set(CLOUDBASE_USER, cluster.getUsername()); + conf.set(CLOUDBASE_PASSWORD, cluster.getPassword()); + conf.set(CLOUDBASE_INSTANCE, cluster.getInstanceName()); + + crdfdao.setConf(conf); + ryaStore.setRyaDAO(crdfdao); + + final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); + ryaRepo.initialize(); + + return ryaRepo; + } + /** * Ensure that when a new PCJ table is created, it is initialized with the * correct metadata values. @@ -112,7 +155,7 @@ public class PcjTablesIntegrationTest { * The method being tested is {@link PcjTables#createPcjTable(Connector, String, Set, String)} */ @Test - public void createPcjTable() throws PcjException { + public void createPcjTable() throws PcjException, AccumuloException, AccumuloSecurityException { final String sparql = "SELECT ?name ?age " + "{" + @@ -121,6 +164,8 @@ public class PcjTablesIntegrationTest { "?name <http://playsSport> \"Soccer\" " + "}"; + final Connector accumuloConn = cluster.getConnector(); + // Create a PCJ table in the Mini Accumulo. final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); @@ -141,7 +186,7 @@ public class PcjTablesIntegrationTest { * The method being tested is {@link PcjTables#addResults(Connector, String, java.util.Collection)} */ @Test - public void addResults() throws PcjException, TableNotFoundException, BindingSetConversionException { + public void addResults() throws PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { final String sparql = "SELECT ?name ?age " + "{" + @@ -150,6 +195,8 @@ public class PcjTablesIntegrationTest { "?name <http://playsSport> \"Soccer\" " + "}"; + final Connector accumuloConn = cluster.getConnector(); + // Create a PCJ table in the Mini Accumulo. final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); @@ -189,6 +236,53 @@ public class PcjTablesIntegrationTest { assertEquals(expectedResults, fetchedResults); } + @Test + public void listResults() throws PCJStorageException, AccumuloException, AccumuloSecurityException { + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final Connector accumuloConn = cluster.getConnector(); + + // Create a PCJ table in the Mini Accumulo. + final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); + final PcjTables pcjs = new PcjTables(); + pcjs.createPcjTable(accumuloConn, pcjTableName, varOrders, sparql); + + // Add a few results to the PCJ table. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + pcjs.addResults(accumuloConn, pcjTableName, Sets.<VisibilityBindingSet>newHashSet( + new VisibilityBindingSet(alice), + new VisibilityBindingSet(bob), + new VisibilityBindingSet(charlie))); + + // Fetch the Binding Sets that have been stored in the PCJ table. + final Set<BindingSet> results = new HashSet<>(); + for(final BindingSet result : pcjs.listResults(accumuloConn, pcjTableName, new Authorizations())) { + results.add( result ); + } + + // Verify the fetched results match the expected ones. + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie); + assertEquals(expected, results); + } + /** * Ensure when results are already stored in Rya, that we are able to populate * the PCJ table for a new SPARQL query using those results. @@ -196,7 +290,7 @@ public class PcjTablesIntegrationTest { * The method being tested is: {@link PcjTables#populatePcj(Connector, String, RepositoryConnection, String)} */ @Test - public void populatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException { + public void populatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { // Load some Triples into Rya. final Set<Statement> triples = new HashSet<>(); triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); @@ -221,6 +315,8 @@ public class PcjTablesIntegrationTest { "?name <http://playsSport> \"Soccer\" " + "}"; + final Connector accumuloConn = cluster.getConnector(); + final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); final PcjTables pcjs = new PcjTables(); @@ -264,7 +360,7 @@ public class PcjTablesIntegrationTest { * The method being tested is: {@link PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, String[], Optional)} */ @Test - public void createAndPopulatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException { + public void createAndPopulatePcj() throws RepositoryException, PcjException, TableNotFoundException, BindingSetConversionException, AccumuloException, AccumuloSecurityException { // Load some Triples into Rya. final Set<Statement> triples = new HashSet<>(); triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); @@ -289,6 +385,8 @@ public class PcjTablesIntegrationTest { "?name <http://playsSport> \"Soccer\" " + "}"; + final Connector accumuloConn = cluster.getConnector(); + final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); // Create and populate the PCJ table. @@ -325,7 +423,9 @@ public class PcjTablesIntegrationTest { } @Test - public void listPcjs() throws PCJStorageException { + public void listPcjs() throws PCJStorageException, AccumuloException, AccumuloSecurityException { + final Connector accumuloConn = cluster.getConnector(); + // Set up the table names that will be used. final String instance1 = "instance1_"; final String instance2 = "instance2_"; @@ -358,7 +458,7 @@ public class PcjTablesIntegrationTest { } @Test - public void purge() throws PCJStorageException { + public void purge() throws PCJStorageException, AccumuloException, AccumuloSecurityException { final String sparql = "SELECT ?name ?age " + "{" + @@ -367,6 +467,8 @@ public class PcjTablesIntegrationTest { "?name <http://playsSport> \"Soccer\" " + "}"; + final Connector accumuloConn = cluster.getConnector(); + // Create a PCJ table in the Mini Accumulo. final String pcjTableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "testPcj"); final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(new VariableOrder("name;age")); @@ -404,7 +506,9 @@ public class PcjTablesIntegrationTest { } @Test - public void dropPcj() throws PCJStorageException { + public void dropPcj() throws PCJStorageException, AccumuloException, AccumuloSecurityException { + final Connector accumuloConn = cluster.getConnector(); + // Create a PCJ index. final String tableName = new PcjTableNameFactory().makeTableName(RYA_TABLE_PREFIX, "thePcj"); final Set<VariableOrder> varOrders = Sets.<VariableOrder>newHashSet( new VariableOrder("x") ); @@ -457,90 +561,4 @@ public class PcjTablesIntegrationTest { return fetchedResults; } - - @After - public void shutdownMiniResources() { - if(ryaConn != null) { - try { - log.info("Shutting down Rya Connection."); - ryaConn.close(); - log.info("Rya Connection shut down."); - } catch(final Exception e) { - log.error("Could not shut down the Rya Connection.", e); - } - } - - if(ryaRepo != null) { - try { - log.info("Shutting down Rya Repo."); - ryaRepo.shutDown(); - log.info("Rya Repo shut down."); - } catch(final Exception e) { - log.error("Could not shut down the Rya Repo.", e); - } - } - - if(accumulo != null) { - try { - log.info("Shutting down the Mini Accumulo being used as a Rya store."); - accumulo.stop(); - log.info("Mini Accumulo being used as a Rya store shut down."); - } catch(final Exception e) { - log.error("Could not shut down the Mini Accumulo.", e); - } - } - } - - /** - * Setup a Mini Accumulo cluster that uses a temporary directory to store its data. - * - * @return A Mini Accumulo cluster. - */ - private static MiniAccumuloCluster startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { - final File miniDataDir = Files.createTempDir(); - - // Setup and start the Mini Accumulo. - final MiniAccumuloCluster accumulo = new MiniAccumuloCluster(miniDataDir, "password"); - accumulo.start(); - - // Store a connector to the Mini Accumulo. - final Instance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers()); - accumuloConn = instance.getConnector("root", new PasswordToken("password")); - - return accumulo; - } - - /** - * Format a Mini Accumulo to be a Rya repository. - * - * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. (not null) - * @return The Rya repository sitting on top of the Mini Accumulo. - */ - private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException { - checkNotNull(accumulo); - - // Setup the Rya Repository that will be used to create Repository Connections. - final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); - final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO(); - crdfdao.setConnector(accumuloConn); - - // Setup Rya configuration values. - final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); - conf.setTablePrefix("demo_"); - conf.setDisplayQueryPlan(true); - - conf.setBoolean(USE_MOCK_INSTANCE, true); - conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX); - conf.set(CLOUDBASE_USER, "root"); - conf.set(CLOUDBASE_PASSWORD, "password"); - conf.set(CLOUDBASE_INSTANCE, accumulo.getInstanceName()); - - crdfdao.setConf(conf); - ryaStore.setRyaDAO(crdfdao); - - final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); - ryaRepo.initialize(); - - return ryaRepo; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactoryTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactoryTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactoryTest.java new file mode 100644 index 0000000..f82b735 --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/ShiftVarOrderFactoryTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.util.Set; + +import org.junit.Test; +import org.openrdf.query.MalformedQueryException; + +import com.beust.jcommander.internal.Sets; + +/** + * Tests the methods of {@link ShiftVarOrderFactory}. + */ +public class ShiftVarOrderFactoryTest { + + @Test + public void makeVarOrders_fromSPARQL() throws MalformedQueryException { + // The SPARQL whose PCJ var orders will be generated from. + final String sparql = + "SELECT ?a ?b ?c " + + "WHERE { " + + "?a <http://talksTo> ?b. " + + "?b <http://worksAt> ?c. " + + "}"; + + // Run the test. + final PcjVarOrderFactory factory = new ShiftVarOrderFactory(); + final Set<VariableOrder> varOrders = factory.makeVarOrders(sparql); + + // Ensure the returned set matches the expected results. + final Set<VariableOrder> expected = Sets.newHashSet(); + expected.add( new VariableOrder("a", "b", "c") ); + expected.add( new VariableOrder("c", "a", "b") ); + expected.add( new VariableOrder("b", "c", "a") ); + assertEquals(expected, varOrders); + } + + @Test + public void makeVarOrders_fromVarOrder() { + // The VariableOrder whose PCJ var orders will be generated from. + final VariableOrder varOrder = new VariableOrder("a", "b", "c"); + + // Run the test. + final PcjVarOrderFactory factory = new ShiftVarOrderFactory(); + final Set<VariableOrder> varOrders = factory.makeVarOrders(varOrder); + + // Ensure the returned set matches the expected results. + final Set<VariableOrder> expected = Sets.newHashSet(); + expected.add( new VariableOrder("a", "b", "c") ); + expected.add( new VariableOrder("c", "a", "b") ); + expected.add( new VariableOrder("b", "c", "a") ); + assertEquals(expected, varOrders); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java new file mode 100644 index 0000000..7474207 --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/accumulo/accumulo/AccumuloPcjStorageIT.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.accumulo.accumulo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.ImmutableMap; + +import mvm.rya.accumulo.AccumuloRyaITBase; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; + +/** + * Integration tests the methods of {@link AccumuloPcjStorage}. + * </p> + * These tests ensures that the PCJ tables are maintained and that these operations + * also update the Rya instance's details. + */ +public class AccumuloPcjStorageIT extends AccumuloRyaITBase { + + @Test + public void createPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException { + // Setup the PCJ storage that will be tested against. + final Connector connector = super.getClusterInstance().getConnector(); + final String ryaInstanceName = super.getRyaInstanceName(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); + + // Create a PCJ. + final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + + // Ensure the Rya details have been updated to include the PCJ's ID. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName); + + final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails() + .getPCJIndexDetails() + .getPCJDetails(); + + final PCJDetails expectedDetails = PCJDetails.builder() + .setId( pcjId ) + .build(); + + assertEquals(expectedDetails, detailsMap.get(pcjId)); + } + + @Test + public void dropPCJ() throws AccumuloException, AccumuloSecurityException, PCJStorageException, NotInitializedException, RyaDetailsRepositoryException { + // Setup the PCJ storage that will be tested against. + final Connector connector = super.getClusterInstance().getConnector(); + final String ryaInstanceName = super.getRyaInstanceName(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); + + // Create a PCJ. + final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + + // Delete the PCJ that was just created. + pcjStorage.dropPcj(pcjId); + + // Ensure the Rya details have been updated to no longer include the PCJ's ID. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(connector, ryaInstanceName); + + final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails() + .getPCJIndexDetails() + .getPCJDetails(); + + assertFalse( detailsMap.containsKey(pcjId) ); + } + + @Test + public void listPcjs() throws AccumuloException, AccumuloSecurityException, PCJStorageException { + // Setup the PCJ storage that will be tested against. + final Connector connector = super.getClusterInstance().getConnector(); + final String ryaInstanceName = super.getRyaInstanceName(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); + + // Create a few PCJs and hold onto their IDs. + final List<String> expectedIds = new ArrayList<>(); + + String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + expectedIds.add( pcjId ); + + pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + expectedIds.add( pcjId ); + + pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + expectedIds.add( pcjId ); + + // Fetch the PCJ names + final List<String> pcjIds = pcjStorage.listPcjs(); + + // Ensure the expected IDs match the fetched IDs. + Collections.sort(expectedIds); + Collections.sort(pcjIds); + assertEquals(expectedIds, pcjIds); + } + + @Test + public void getPcjMetadata() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException { + // Setup the PCJ storage that will be tested against. + final Connector connector = super.getClusterInstance().getConnector(); + final String ryaInstanceName = super.getRyaInstanceName(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); + + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Fetch the PCJ's metadata. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + + // Ensure it has the expected values. + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); + assertEquals(expectedMetadata, metadata); + } + + @Test + public void addResults() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException { + // Setup the PCJ storage that will be tested against. + final Connector connector = super.getClusterInstance().getConnector(); + final String ryaInstanceName = super.getRyaInstanceName(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); + + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Add some binding sets to it. + final Set<VisibilityBindingSet> results = new HashSet<>(); + + final MapBindingSet aliceBS = new MapBindingSet(); + aliceBS.addBinding("a", new URIImpl("http://Alice")); + aliceBS.addBinding("b", new URIImpl("http://Person")); + results.add( new VisibilityBindingSet(aliceBS, "") ); + + final MapBindingSet charlieBS = new MapBindingSet(); + charlieBS.addBinding("a", new URIImpl("http://Charlie")); + charlieBS.addBinding("b", new URIImpl("http://Comedian")); + results.add( new VisibilityBindingSet(charlieBS, "") ); + + pcjStorage.addResults(pcjId, results); + + // Make sure the PCJ metadata was updated. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders); + assertEquals(expectedMetadata, metadata); + } + + @Test + public void listResults() throws AccumuloException, AccumuloSecurityException, PCJStorageException { + // Setup the PCJ storage that will be tested against. + final Connector connector = super.getClusterInstance().getConnector(); + final String ryaInstanceName = super.getRyaInstanceName(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); + + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Add some binding sets to it. + final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); + + final MapBindingSet aliceBS = new MapBindingSet(); + aliceBS.addBinding("a", new URIImpl("http://Alice")); + aliceBS.addBinding("b", new URIImpl("http://Person")); + expectedResults.add( new VisibilityBindingSet(aliceBS, "") ); + + final MapBindingSet charlieBS = new MapBindingSet(); + charlieBS.addBinding("a", new URIImpl("http://Charlie")); + charlieBS.addBinding("b", new URIImpl("http://Comedian")); + expectedResults.add( new VisibilityBindingSet(charlieBS, "") ); + + pcjStorage.addResults(pcjId, expectedResults); + + // List the results that were stored. + final Set<BindingSet> results = new HashSet<>(); + for(final BindingSet result : pcjStorage.listResults(pcjId)) { + results.add( result ); + } + + assertEquals(expectedResults, results); + } + + @Test + public void purge() throws AccumuloException, AccumuloSecurityException, PCJStorageException, MalformedQueryException { + // Setup the PCJ storage that will be tested against. + final Connector connector = super.getClusterInstance().getConnector(); + final String ryaInstanceName = super.getRyaInstanceName(); + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(connector, ryaInstanceName); + + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Add some binding sets to it. + final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); + + final MapBindingSet aliceBS = new MapBindingSet(); + aliceBS.addBinding("a", new URIImpl("http://Alice")); + aliceBS.addBinding("b", new URIImpl("http://Person")); + expectedResults.add( new VisibilityBindingSet(aliceBS, "") ); + + final MapBindingSet charlieBS = new MapBindingSet(); + charlieBS.addBinding("a", new URIImpl("http://Charlie")); + charlieBS.addBinding("b", new URIImpl("http://Comedian")); + expectedResults.add( new VisibilityBindingSet(charlieBS, "") ); + + pcjStorage.addResults(pcjId, expectedResults); + + // Purge the PCJ. + pcjStorage.purge(pcjId); + + // List the results that were stored. + final Set<BindingSet> results = new HashSet<>(); + for(final BindingSet result : pcjStorage.listResults(pcjId)) { + results.add( result ); + } + + assertTrue( results.isEmpty() ); + + // Make sure the PCJ metadata was updated. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); + assertEquals(expectedMetadata, metadata); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java index 0a4885c..29d328d 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java +++ b/extras/rya.pcj.fluo/pcj.fluo.api/src/main/java/org/apache/rya/indexing/pcj/fluo/api/CreatePcj.java @@ -20,6 +20,7 @@ package org.apache.rya.indexing.pcj.fluo.api; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import static org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants.NODEID_BS_DELIM; import java.util.HashSet; @@ -27,7 +28,6 @@ import java.util.Set; import javax.annotation.ParametersAreNonnullByDefault; -import org.apache.accumulo.core.client.Connector; import org.apache.rya.indexing.pcj.fluo.app.FluoStringConverter; import org.apache.rya.indexing.pcj.fluo.app.StringTypeLayer; import org.apache.rya.indexing.pcj.fluo.app.query.FluoQuery; @@ -37,10 +37,9 @@ import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder; import org.apache.rya.indexing.pcj.fluo.app.query.SparqlFluoQueryBuilder.NodeIds; import org.apache.rya.indexing.pcj.fluo.app.query.StatementPatternMetadata; import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetStringConverter; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; -import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; -import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; @@ -50,13 +49,13 @@ import org.openrdf.query.algebra.StatementPattern; import org.openrdf.query.impl.MapBindingSet; import org.openrdf.query.parser.ParsedQuery; import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.repository.sail.SailRepository; import org.openrdf.sail.SailConnection; import org.openrdf.sail.SailException; import info.aduna.iteration.CloseableIteration; import io.fluo.api.client.FluoClient; import io.fluo.api.types.TypedTransaction; -import mvm.rya.rdftriplestore.RyaSailRepository; /** * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. @@ -87,11 +86,6 @@ public class CreatePcj { private static final int DEFAULT_SP_INSERT_BATCH_SIZE = 1000; /** - * A utility used to interact with Rya's PCJ tables. - */ - private static final PcjTables PCJ_TABLES = new PcjTables(); - - /** * The maximum number of binding sets that will be inserted into each Statement * Pattern's result set per Fluo transaction. */ @@ -117,80 +111,58 @@ public class CreatePcj { } /** - * Sets up a new Pre Computed Join (PCJ) in Fluo from a SPARQL query. Historic - * triples will be scanned and matched using the rya connection that was - * provided. The PCJ will also automatically export to a table in Accumulo - * named using the {@code ryaTablePrefix} and the query's ID from the Fluo table. + * Tells the Fluo PCJ Updater application to maintain a new PCJ. + * <p> + * This call scans Rya for Statement Pattern matches and inserts them into + * the Fluo application. The Fluo application will then maintain the intermediate + * results as new triples are inserted and export any new query results to the + * {@code pcjId} within the provided {@code pcjStorage}. * - * @param fluo - A connection to the Fluo table that will be updated. (not null) - * @param ryaTablePrefix - The prefix that will be prepended to the Accumulo table - * the PCJ's results will be exported to. (not null) - * @param rya - A connection to the Rya repository that will be scanned. (not null) - * @param accumuloConn - A connectino to the Accumulo instance the incremental - * results will be exported to as a Rya PCJ table. (not null) - * @param varOrders - The variable orders the query's results will be exported to - * within the export table. If this set is empty, then a default will be - * used instead.(not null) - * @param sparql - The SPARQL query whose results will be incrementally updated by Fluo. (not null) - * @throws MalformedQueryException The PCJ could not be initialized because the SPARQL query was malformed. - * @throws PcjException The PCJ could not be initialized because of a problem setting up the export location. - * @throws SailException Historic results could not be added to the initialized PCJ because of - * a problem with the Rya connection. - * @throws QueryEvaluationException Historic results could not be added to the initialized PCJ because of - * a problem with the Rya connection. + * @param pcjId - Identifies the PCJ that will be updated by the Fluo app. (not null) + * @param pcjStorage - Provides access to the PCJ index. (not null) + * @param fluo - A connection to the Fluo application that updates the PCJ index. (not null) + * @param rya - A connection to the Rya instance hosting the PCJ, (not null) + * + * @throws MalformedQueryException The SPARQL query stored for the {@code pcjId} is malformed. + * @throws PcjException The PCJ Metadata for {@code pcjId} could not be read from {@code pcjStorage}. + * @throws SailException Historic PCJ results could not be loaded because of a problem with {@code rya}. + * @throws QueryEvaluationException Historic PCJ results could not be loaded because of a problem with {@code rya}. */ public void withRyaIntegration( + final String pcjId, + final PrecomputedJoinStorage pcjStorage, final FluoClient fluo, - final String ryaTablePrefix, - final RyaSailRepository rya, - final Connector accumuloConn, - final Set<VariableOrder> varOrders, - final String sparql) throws MalformedQueryException, PcjException, SailException, QueryEvaluationException { - checkNotNull(fluo); - checkNotNull(ryaTablePrefix); - checkNotNull(rya); - checkNotNull(accumuloConn); - checkNotNull(varOrders); - checkNotNull(sparql); - - // Parse the SPARQL into a POJO. - final SPARQLParser parser = new SPARQLParser(); - final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); + final SailRepository rya) + throws MalformedQueryException, PcjException, SailException, QueryEvaluationException { + requireNonNull(pcjId); + requireNonNull(pcjStorage); + requireNonNull(fluo); + requireNonNull(rya); // Keeps track of the IDs that are assigned to each of the query's nodes in Fluo. // We use these IDs later when scanning Rya for historic Statement Pattern matches // as well as setting up automatic exports. final NodeIds nodeIds = new NodeIds(); - final String exportTableName; - final String queryId; // Parse the query's structure for the metadata that will be written to fluo. + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = pcjMetadata.getSparql(); + final ParsedQuery parsedQuery = new SPARQLParser().parseQuery(sparql, null); final FluoQuery fluoQuery = new SparqlFluoQueryBuilder().make(parsedQuery, nodeIds); try(TypedTransaction tx = STRING_TYPED_LAYER.wrap( fluo.newTransaction() )) { // Write the query's structure to Fluo. new FluoQueryMetadataDAO().write(tx, fluoQuery); - // Since we are exporting the query's results to a table in Accumulo, store that location in the fluo table. - queryId = fluoQuery.getQueryMetadata().getNodeId(); - - exportTableName = new PcjTableNameFactory().makeTableName(ryaTablePrefix, queryId); - tx.mutate().row(queryId).col(FluoQueryColumns.QUERY_RYA_EXPORT_TABLE_NAME).set(exportTableName); + // The results of the query are eventually exported to an instance of Rya, so store the Rya ID for the PCJ. + final String queryId = fluoQuery.getQueryMetadata().getNodeId(); + tx.mutate().row(queryId).col(FluoQueryColumns.RYA_PCJ_ID).set(pcjId); + tx.mutate().row(pcjId).col(FluoQueryColumns.PCJ_ID_QUERY_ID).set(queryId); // Flush the changes to Fluo. tx.commit(); } - // Initialize the export destination in Accumulo. If triples are being written to Fluo - // while this query is being created, then the export observer may throw errors for a while - // until this step is completed. - final VariableOrder queryVarOrder = fluoQuery.getQueryMetadata().getVariableOrder(); - if(varOrders.isEmpty()) { - final Set<VariableOrder> shiftVarOrders = new ShiftVarOrderFactory().makeVarOrders( queryVarOrder ); - varOrders.addAll(shiftVarOrders); - } - PCJ_TABLES.createPcjTable(accumuloConn, exportTableName, varOrders, sparql); - // Get a connection to Rya. It's used to scan for Statement Pattern results. final SailConnection ryaConn = rya.getSail().getConnection();
