RYA-260 Fluo PCJ application has had Aggregation support added to it. Also fixed a bunch of resource leaks that were causing integration tests to fail. Closes #156.
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/c941aea8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/c941aea8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/c941aea8 Branch: refs/heads/master Commit: c941aea8b65acb99b451757c48279914d9488c85 Parents: be9ea9a Author: Kevin Chilton <[email protected]> Authored: Fri Apr 7 15:57:57 2017 -0400 Committer: Caleb Meier <[email protected]> Committed: Mon Apr 24 07:58:25 2017 -0700 ---------------------------------------------------------------------- .../AccumuloRyaInstanceDetailsRepository.java | 20 +- .../accumulo/utils/VisibilitySimplifier.java | 30 +- .../utils/VisibilitySimplifierTest.java | 24 + .../rya/accumulo/utils/RyaTableNames.java | 11 +- .../client/accumulo/AccumuloBatchUpdatePCJ.java | 87 ++- .../api/client/accumulo/AccumuloCreatePCJ.java | 86 ++- .../api/client/accumulo/AccumuloDeletePCJ.java | 38 +- .../pcj/matching/AccumuloIndexSetProvider.java | 62 +- .../accumulo/AccumuloBatchUpdatePCJIT.java | 10 +- .../client/accumulo/AccumuloCreatePCJIT.java | 72 +- .../client/accumulo/AccumuloDeletePCJIT.java | 46 +- .../benchmark/query/QueryBenchmarkRunIT.java | 29 +- .../pcj/storage/PrecomputedJoinStorage.java | 22 +- .../storage/accumulo/AccumuloPcjSerializer.java | 20 - .../storage/accumulo/AccumuloPcjStorage.java | 8 +- .../storage/accumulo/BindingSetConverter.java | 9 +- .../accumulo/BindingSetStringConverter.java | 53 +- .../pcj/storage/accumulo/PcjTables.java | 39 +- .../accumulo/ScannerBindingSetIterator.java | 18 +- .../pcj/storage/accumulo/VariableOrder.java | 15 +- .../pcj/update/PrecomputedJoinUpdater.java | 10 +- .../accumulo/AccumuloPcjSerializerTest.java | 185 +++++ .../accumulo/AccumuloPcjSerialzerTest.java | 175 ----- .../accumulo/BindingSetStringConverterTest.java | 42 +- .../accumulo/PcjTablesIntegrationTest.java | 26 +- .../accumulo/accumulo/AccumuloPcjStorageIT.java | 284 ++++---- .../rya/indexing/pcj/fluo/api/CreatePcj.java | 277 ++++---- .../rya/indexing/pcj/fluo/api/DeletePcj.java | 28 +- extras/rya.pcj.fluo/pcj.fluo.app/pom.xml | 10 +- .../pcj/fluo/app/AggregationResultUpdater.java | 572 +++++++++++++++ .../indexing/pcj/fluo/app/BindingSetRow.java | 11 +- .../pcj/fluo/app/FilterResultUpdater.java | 57 +- .../rya/indexing/pcj/fluo/app/IncUpdateDAO.java | 95 +-- .../fluo/app/IncrementalUpdateConstants.java | 1 + .../pcj/fluo/app/JoinResultUpdater.java | 131 ++-- .../rya/indexing/pcj/fluo/app/NodeType.java | 11 +- .../pcj/fluo/app/QueryResultUpdater.java | 69 +- .../pcj/fluo/app/VisibilityBindingSetSerDe.java | 77 +++ .../app/export/IncrementalResultExporter.java | 12 +- .../app/export/kafka/KafkaResultExporter.java | 32 +- .../fluo/app/export/rya/RyaResultExporter.java | 16 +- .../fluo/app/observers/AggregationObserver.java | 74 ++ .../fluo/app/observers/BindingSetUpdater.java | 53 +- .../pcj/fluo/app/observers/FilterObserver.java | 25 +- .../pcj/fluo/app/observers/JoinObserver.java | 24 +- .../fluo/app/observers/QueryResultObserver.java | 54 +- .../app/observers/StatementPatternObserver.java | 25 +- .../pcj/fluo/app/observers/TripleObserver.java | 158 +++-- .../pcj/fluo/app/query/AggregationMetadata.java | 371 ++++++++++ .../indexing/pcj/fluo/app/query/FluoQuery.java | 111 ++- .../pcj/fluo/app/query/FluoQueryColumns.java | 60 +- .../fluo/app/query/FluoQueryMetadataDAO.java | 186 ++++- .../fluo/app/query/SparqlFluoQueryBuilder.java | 116 +++- .../pcj/fluo/app/util/BindingSetUtil.java | 54 ++ .../indexing/pcj/fluo/app/util/RowKeyUtil.java | 69 ++ .../fluo/app/VisibilityBindingSetSerDeTest.java | 51 ++ .../fluo/client/command/NewQueryCommand.java | 2 +- .../pcj/fluo/demo/FluoAndHistoricPcjsDemo.java | 38 +- .../rya.pcj.fluo/pcj.fluo.integration/pom.xml | 13 +- .../apache/rya/indexing/pcj/fluo/ITBase.java | 443 ------------ .../indexing/pcj/fluo/KafkaExportITBase.java | 315 +++++++++ .../rya/indexing/pcj/fluo/RyaExportITBase.java | 182 +++++ .../pcj/fluo/api/CountStatementsIT.java | 54 +- .../indexing/pcj/fluo/api/GetPcjMetadataIT.java | 91 +-- .../indexing/pcj/fluo/api/GetQueryReportIT.java | 107 +-- .../indexing/pcj/fluo/api/ListQueryIdsIT.java | 35 +- .../fluo/app/query/FluoQueryMetadataDAOIT.java | 204 ++++-- .../pcj/fluo/integration/CreateDeleteIT.java | 166 +++-- .../indexing/pcj/fluo/integration/InputIT.java | 275 +++++--- .../pcj/fluo/integration/KafkaExportIT.java | 693 ++++++++++++------- .../indexing/pcj/fluo/integration/QueryIT.java | 580 ++++++++-------- .../pcj/fluo/integration/RyaExportIT.java | 101 +-- .../RyaInputIncrementalUpdateIT.java | 245 ++++--- .../pcj/fluo/integration/StreamingTestIT.java | 140 ++-- .../HistoricStreamingVisibilityIT.java | 80 ++- .../pcj/fluo/visibility/PcjVisibilityIT.java | 199 +++--- .../rya.pcj.fluo/rya.pcj.functions.geo/pom.xml | 44 +- .../rya/indexing/pcj/fluo/RyaExportITBase.java | 182 +++++ .../pcj/functions/geo/GeoFunctionsIT.java | 471 ++++++------- pom.xml | 16 + 80 files changed, 5719 insertions(+), 3208 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java index be8e12c..dcd64de 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/instance/AccumuloRyaInstanceDetailsRepository.java @@ -22,9 +22,6 @@ import static java.util.Objects.requireNonNull; import java.util.Map.Entry; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; @@ -48,6 +45,9 @@ import org.apache.hadoop.io.Text; import org.apache.rya.api.instance.RyaDetails; import org.apache.rya.api.instance.RyaDetailsRepository; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * An implementation of {@link RyaDetailsRepository} that stores a Rya * instance's {@link RyaDetails} in an Accumulo table. @@ -89,12 +89,17 @@ public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepositor @Override public boolean isInitialized() throws RyaDetailsRepositoryException { + Scanner scanner = null; try { - final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations()); + scanner = connector.createScanner(detailsTableName, new Authorizations()); scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER); return scanner.iterator().hasNext(); } catch (final TableNotFoundException e) { return false; + } finally { + if(scanner != null) { + scanner.close(); + } } } @@ -157,9 +162,10 @@ public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepositor } // Read it from the table. + Scanner scanner = null; try { // Fetch the value from the table. - final Scanner scanner = connector.createScanner(detailsTableName, new Authorizations()); + scanner = connector.createScanner(detailsTableName, new Authorizations()); scanner.fetchColumn(COL_FAMILY, COL_QUALIFIER); final Entry<Key, Value> entry = scanner.iterator().next(); @@ -169,6 +175,10 @@ public class AccumuloRyaInstanceDetailsRepository implements RyaDetailsRepositor } catch (final TableNotFoundException e) { throw new RyaDetailsRepositoryException("Could not get the details from the table.", e); + } finally { + if(scanner != null) { + scanner.close(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java index 98c6abd..8fa3b0e 100644 --- a/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java +++ b/dao/accumulo.rya/src/main/java/org/apache/rya/accumulo/utils/VisibilitySimplifier.java @@ -20,13 +20,13 @@ package org.apache.rya.accumulo.utils; import static java.util.Objects.requireNonNull; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.accumulo.core.security.ColumnVisibility; import com.google.common.base.Charsets; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * Simplifies Accumulo visibility expressions. */ @@ -34,12 +34,34 @@ import com.google.common.base.Charsets; public class VisibilitySimplifier { /** + * Unions two visibility equations and then simplifies the result. + * + * @param vis1 - The first visibility equation that will be unioned. (not null) + * @param vis2 - The other visibility equation that will be unioned. (not null) + * @return A simplified form of the unioned visibility equations. + */ + public static String unionAndSimplify(final String vis1, final String vis2) { + requireNonNull(vis1); + requireNonNull(vis2); + + if(vis1.isEmpty()) { + return vis2; + } + + if(vis2.isEmpty()) { + return vis1; + } + + return simplify("(" + vis1 + ")&(" + vis2 + ")"); + } + + /** * Simplifies an Accumulo visibility expression. * * @param visibility - The expression to simplify. (not null) * @return A simplified form of {@code visibility}. */ - public String simplify(final String visibility) { + public static String simplify(final String visibility) { requireNonNull(visibility); String last = visibility; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java b/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java index a9a03ce..0adb325 100644 --- a/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java +++ b/dao/accumulo.rya/src/test/java/org/apache/rya/accumulo/utils/VisibilitySimplifierTest.java @@ -50,4 +50,28 @@ public class VisibilitySimplifierTest { final String simplified = new VisibilitySimplifier().simplify("(a|b)|(a|b)|a|b"); assertEquals("a|b", simplified); } + + @Test + public void unionAndSimplify() { + final String simplified = new VisibilitySimplifier().unionAndSimplify("u&b", "u"); + assertEquals("b&u", simplified); + } + + @Test + public void unionAndSimplify_firstIsEmpty() { + final String simplified = new VisibilitySimplifier().unionAndSimplify("", "u"); + assertEquals("u", simplified); + } + + @Test + public void unionAndSimplify_secondIsEmpty() { + final String simplified = new VisibilitySimplifier().unionAndSimplify("u", ""); + assertEquals("u", simplified); + } + + @Test + public void unionAndSimplify_bothAreEmpty() { + final String simplified = new VisibilitySimplifier().unionAndSimplify("", ""); + assertEquals("", simplified); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java b/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java index faeebbb..cd17cbc 100644 --- a/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java +++ b/extras/indexing/src/main/java/org/apache/rya/accumulo/utils/RyaTableNames.java @@ -34,6 +34,7 @@ import org.apache.rya.api.layout.TablePrefixLayoutStrategy; import org.apache.rya.indexing.accumulo.entity.EntityCentricIndex; import org.apache.rya.indexing.accumulo.freetext.AccumuloFreeTextIndexer; import org.apache.rya.indexing.accumulo.temporal.AccumuloTemporalIndexer; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; @@ -93,11 +94,13 @@ public class RyaTableNames { */ if(details.getPCJIndexDetails().isEnabled()) { - final List<String> pcjIds = new AccumuloPcjStorage(conn, ryaInstanceName).listPcjs(); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(conn, ryaInstanceName)) { + final List<String> pcjIds = pcjStorage.listPcjs(); - final PcjTableNameFactory tableNameFactory = new PcjTableNameFactory(); - for(final String pcjId : pcjIds) { - tables.add( tableNameFactory.makeTableName(ryaInstanceName, pcjId) ); + final PcjTableNameFactory tableNameFactory = new PcjTableNameFactory(); + for(final String pcjId : pcjIds) { + tables.add( tableNameFactory.makeTableName(ryaInstanceName, pcjId) ); + } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java index 790fe80..76aad02 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJ.java @@ -28,23 +28,6 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; import org.apache.log4j.Logger; -import org.apache.rya.indexing.pcj.storage.PcjMetadata; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; -import org.openrdf.query.BindingSet; -import org.openrdf.query.MalformedQueryException; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.parser.ParsedQuery; -import org.openrdf.query.parser.sparql.SPARQLParser; -import org.openrdf.sail.Sail; -import org.openrdf.sail.SailConnection; -import org.openrdf.sail.SailException; - -import com.google.common.base.Optional; - -import info.aduna.iteration.CloseableIteration; import org.apache.rya.accumulo.AccumuloRdfConfiguration; import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; import org.apache.rya.api.client.BatchUpdatePCJ; @@ -62,8 +45,25 @@ import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; import org.apache.rya.api.persist.RyaDAOException; import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.apache.rya.rdftriplestore.inference.InferenceEngineException; import org.apache.rya.sail.config.RyaSailFactory; +import org.openrdf.query.BindingSet; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.parser.ParsedQuery; +import org.openrdf.query.parser.sparql.SPARQLParser; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailConnection; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +import info.aduna.iteration.CloseableIteration; /** * Uses an in memory Rya Client to batch update a PCJ index. @@ -126,12 +126,11 @@ public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpda SailConnection sailConn = null; CloseableIteration<? extends BindingSet, QueryEvaluationException> results = null; - try { + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName)) { // Create an instance of Sail backed by the Rya instance. sail = connectToRya(ryaInstanceName); // Purge the old results from the PCJ. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), ryaInstanceName); try { pcjStorage.purge(pcjId); } catch (final PCJStorageException e) { @@ -139,37 +138,35 @@ public class AccumuloBatchUpdatePCJ extends AccumuloCommand implements BatchUpda "results could not be purged from it.", e); } - try { - // Parse the PCJ's SPARQL query. - final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); - final String sparql = metadata.getSparql(); - final SPARQLParser parser = new SPARQLParser(); - final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); - - // Execute the query. - sailConn = sail.getConnection(); - results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false); - - // Load the results into the PCJ table. - final List<VisibilityBindingSet> batch = new ArrayList<>(1000); - - while(results.hasNext()) { - final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), ""); - batch.add(result); - - if(batch.size() == 1000) { - pcjStorage.addResults(pcjId, batch); - batch.clear(); - } - } + // Parse the PCJ's SPARQL query. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + final String sparql = metadata.getSparql(); + final SPARQLParser parser = new SPARQLParser(); + final ParsedQuery parsedQuery = parser.parseQuery(sparql, null); - if(!batch.isEmpty()) { + // Execute the query. + sailConn = sail.getConnection(); + results = sailConn.evaluate(parsedQuery.getTupleExpr(), null, null, false); + + // Load the results into the PCJ table. + final List<VisibilityBindingSet> batch = new ArrayList<>(1000); + + while(results.hasNext()) { + final VisibilityBindingSet result = new VisibilityBindingSet(results.next(), ""); + batch.add(result); + + if(batch.size() == 1000) { pcjStorage.addResults(pcjId, batch); batch.clear(); } - } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) { - throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e); } + + if(!batch.isEmpty()) { + pcjStorage.addResults(pcjId, batch); + batch.clear(); + } + } catch(final MalformedQueryException | PCJStorageException | SailException | QueryEvaluationException e) { + throw new RyaClientException("Fail to batch load new results into the PCJ with ID '" + pcjId + "'.", e); } finally { if(results != null) { try { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java index ac8da66..3fe1042 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJ.java @@ -90,47 +90,46 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { // Create the PCJ table that will receive the index results. final String pcjId; - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName); - try { + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName)) { pcjId = pcjStorage.createPcj(sparql); - } catch (final PCJStorageException e) { - throw new RyaClientException("Problem while initializing the PCJ table.", e); - } - // If a Fluo application is being used, task it with updating the PCJ. - final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); - if(fluoDetailsHolder.isPresent()) { - final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); - try { - updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId); - } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) { - throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); + // If a Fluo application is being used, task it with updating the PCJ. + final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); + if(fluoDetailsHolder.isPresent()) { + final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); + try { + updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId); + } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException | RyaDAOException e) { + throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); + } + + // Update the Rya Details to indicate the PCJ is being updated incrementally. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); + try { + new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { + @Override + public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { + // Update the original PCJ Details to indicate they are incrementally updated. + final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); + final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) + .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL ); + + // Replace the old PCJ Details with the updated ones. + final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); + builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); + return builder.build(); + } + }); + } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e); + } } - // Update the Rya Details to indicate the PCJ is being updated incrementally. - final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); - try { - new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { - @Override - public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { - // Update the original PCJ Details to indicate they are incrementally updated. - final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); - final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) - .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL ); - - // Replace the old PCJ Details with the updated ones. - final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); - builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); - return builder.build(); - } - }); - } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) { - throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e); - } + // Return the ID that was assigned to the PCJ. + return pcjId; + } catch (final PCJStorageException e) { + throw new RyaClientException("Problem while initializing the PCJ table.", e); } - - // Return the ID that was assigned to the PCJ. - return pcjId; } private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException, RyaDAOException { @@ -139,16 +138,15 @@ public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { // Connect to the Fluo application that is updating this instance's PCJs. final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); - final FluoClient fluoClient = new FluoClientFactory().connect( + try(final FluoClient fluoClient = new FluoClientFactory().connect( cd.getUsername(), new String(cd.getPassword()), cd.getInstanceName(), cd.getZookeepers(), - fluoAppName); - - // Initialize the PCJ within the Fluo application. - final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj(); - fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance); + fluoAppName);) { + // Initialize the PCJ within the Fluo application. + final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj(); + fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, getConnector(), ryaInstance); + } } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java index b6728ec..96e6d58 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJ.java @@ -20,19 +20,7 @@ package org.apache.rya.api.client.accumulo; import static java.util.Objects.requireNonNull; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.accumulo.core.client.Connector; -import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; -import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; -import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; - import org.apache.fluo.api.client.FluoClient; import org.apache.rya.api.client.DeletePCJ; import org.apache.rya.api.client.GetInstanceDetails; @@ -43,6 +31,17 @@ import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * An Accumulo implementation of the {@link DeletePCJ} command. @@ -104,8 +103,7 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ { } // Drop the table that holds the PCJ results from Accumulo. - final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(getConnector(), instanceName); - try { + try(final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(getConnector(), instanceName)) { pcjs.dropPcj(pcjId); } catch (final PCJStorageException e) { throw new RyaClientException("Could not drop the PCJ's table from Accumulo.", e); @@ -118,14 +116,14 @@ public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ { // Connect to the Fluo application that is updating this instance's PCJs. final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); - final FluoClient fluoClient = new FluoClientFactory().connect( + try(final FluoClient fluoClient = new FluoClientFactory().connect( cd.getUsername(), new String(cd.getPassword()), cd.getInstanceName(), cd.getZookeepers(), - fluoAppName); - - // Delete the PCJ from the Fluo App. - new DeletePcj(1000).deletePcj(fluoClient, pcjId); + fluoAppName)) { + // Delete the PCJ from the Fluo App. + new DeletePcj(1000).deletePcj(fluoClient, pcjId); + } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java index 828ee4b..1940e64 100644 --- a/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java +++ b/extras/indexing/src/main/java/org/apache/rya/indexing/pcj/matching/AccumuloIndexSetProvider.java @@ -17,24 +17,6 @@ * under the License. */ package org.apache.rya.indexing.pcj.matching; -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ import static java.util.Objects.requireNonNull; @@ -184,29 +166,31 @@ public class AccumuloIndexSetProvider implements ExternalSetProvider<ExternalTup } // this maps associates pcj table name with pcj sparql query final Map<String, String> indexTables = Maps.newLinkedHashMap(); - final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix); - final PcjTableNameFactory pcjFactory = new PcjTableNameFactory(); - final boolean tablesProvided = tables != null && !tables.isEmpty(); + try(final PrecomputedJoinStorage storage = new AccumuloPcjStorage(conn, tablePrefix)) { + final PcjTableNameFactory pcjFactory = new PcjTableNameFactory(); - if (tablesProvided) { - // if tables provided, associate table name with sparql - for (final String table : tables) { - indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql()); - } - } else if (hasRyaDetails(tablePrefix, conn)) { - // If this is a newer install of Rya, and it has PCJ Details, then - // use those. - final List<String> ids = storage.listPcjs(); - for (final String id : ids) { - indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql()); - } - } else { - // Otherwise figure it out by scanning tables. - final PcjTables pcjTables = new PcjTables(); - for (final String table : conn.tableOperations().list()) { - if (table.startsWith(tablePrefix + "INDEX")) { - indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql()); + final boolean tablesProvided = tables != null && !tables.isEmpty(); + + if (tablesProvided) { + // if tables provided, associate table name with sparql + for (final String table : tables) { + indexTables.put(table, storage.getPcjMetadata(pcjFactory.getPcjId(table)).getSparql()); + } + } else if (hasRyaDetails(tablePrefix, conn)) { + // If this is a newer install of Rya, and it has PCJ Details, then + // use those. + final List<String> ids = storage.listPcjs(); + for (final String id : ids) { + indexTables.put(pcjFactory.makeTableName(tablePrefix, id), storage.getPcjMetadata(id).getSparql()); + } + } else { + // Otherwise figure it out by scanning tables. + final PcjTables pcjTables = new PcjTables(); + for (final String table : conn.tableOperations().list()) { + if (table.startsWith(tablePrefix + "INDEX")) { + indexTables.put(table, pcjTables.getPcjMetadata(conn, table).getSparql()); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java index 30eb4ca..5a2e69d 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloBatchUpdatePCJIT.java @@ -31,6 +31,7 @@ import org.apache.rya.indexing.accumulo.ConfigUtils; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; import org.apache.rya.sail.config.RyaSailFactory; import org.junit.Test; @@ -63,7 +64,7 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase { .build()); Sail sail = null; - try { + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME)) { // Get a Sail connection backed by the installed Rya instance. final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); ryaConf.setTablePrefix(RYA_INSTANCE_NAME); @@ -102,7 +103,6 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase { sailConn.close(); // Create a PCJ for a SPARQL query. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(super.getConnector(), RYA_INSTANCE_NAME); final String sparql = "SELECT ?name WHERE { ?name <urn:likes> <urn:icecream> . ?name <urn:hasEyeColor> <urn:blue> . }"; final String pcjId = pcjStorage.createPcj(sparql); @@ -137,8 +137,10 @@ public class AccumuloBatchUpdatePCJIT extends AccumuloITBase { expectedResults.add(bs); final Set<BindingSet> results = new HashSet<>(); - for(final BindingSet result : pcjStorage.listResults(pcjId)) { - results.add( result ); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } } assertEquals(expectedResults, results); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java index 6c9bf5e..f900837 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloCreatePCJIT.java @@ -24,6 +24,15 @@ import static org.junit.Assert.assertFalse; import java.util.List; import java.util.Set; +import org.apache.rya.api.client.CreatePCJ; +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.DuplicateInstanceNameException; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds; import org.apache.rya.indexing.pcj.storage.PcjMetadata; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; @@ -36,16 +45,6 @@ import org.openrdf.query.impl.MapBindingSet; import com.google.common.base.Optional; import com.google.common.collect.Sets; -import org.apache.rya.api.client.CreatePCJ; -import org.apache.rya.api.client.Install; -import org.apache.rya.api.client.Install.DuplicateInstanceNameException; -import org.apache.rya.api.client.Install.InstallConfiguration; -import org.apache.rya.api.client.InstanceDoesNotExistException; -import org.apache.rya.api.client.RyaClientException; -import org.apache.rya.api.instance.RyaDetails; -import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; -import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; - /** * Integration tests the methods of {@link AccumuloCreatePCJ}. */ @@ -80,42 +79,43 @@ public class AccumuloCreatePCJIT extends FluoITBase { assertEquals(PCJUpdateStrategy.INCREMENTAL, pcjDetails.getUpdateStrategy().get()); // Verify the PCJ's metadata was initialized. - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); - assertEquals(sparql, pcjMetadata.getSparql()); - assertEquals(0L, pcjMetadata.getCardinality()); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) { + final PcjMetadata pcjMetadata = pcjStorage.getPcjMetadata(pcjId); + assertEquals(sparql, pcjMetadata.getSparql()); + assertEquals(0L, pcjMetadata.getCardinality()); - // Verify a Query ID was added for the query within the Fluo app. - final List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient); - assertEquals(1, fluoQueryIds.size()); + // Verify a Query ID was added for the query within the Fluo app. + final List<String> fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient); + assertEquals(1, fluoQueryIds.size()); - // Insert some statements into Rya. - final ValueFactory vf = ryaRepo.getValueFactory(); - ryaConn.add(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); - ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); - ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); + // Insert some statements into Rya. + final ValueFactory vf = ryaRepo.getValueFactory(); + ryaConn.add(vf.createURI("http://Alice"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); + ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); + ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://talksTo"), vf.createURI("http://Eve")); - ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")); + ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://helps"), vf.createURI("http://Kevin")); - ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); - ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); - ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); - ryaConn.add(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://Bob"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://Charlie"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://Eve"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); + ryaConn.add(vf.createURI("http://David"), vf.createURI("http://worksAt"), vf.createURI("http://TacoJoint")); - // Verify the correct results were exported. - fluo.waitForObservers(); + // Verify the correct results were exported. + fluo.waitForObservers(); - final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); + final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); - final MapBindingSet bob = new MapBindingSet(); - bob.addBinding("x", vf.createURI("http://Bob")); + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("x", vf.createURI("http://Bob")); - final MapBindingSet charlie = new MapBindingSet(); - charlie.addBinding("x", vf.createURI("http://Charlie")); + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("x", vf.createURI("http://Charlie")); - final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie); + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie); - assertEquals(expected, results); + assertEquals(expected, results); + } } @Test(expected = InstanceDoesNotExistException.class) http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java index 573fccd..fd75167 100644 --- a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloDeletePCJIT.java @@ -24,6 +24,10 @@ import static org.junit.Assert.assertTrue; import java.util.List; import java.util.Set; +import org.apache.rya.api.client.CreatePCJ; +import org.apache.rya.api.client.DeletePCJ; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClientException; import org.apache.rya.indexing.pcj.fluo.api.ListQueryIds; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; @@ -36,11 +40,6 @@ import org.openrdf.repository.RepositoryException; import com.google.common.collect.Sets; -import org.apache.rya.api.client.CreatePCJ; -import org.apache.rya.api.client.DeletePCJ; -import org.apache.rya.api.client.InstanceDoesNotExistException; -import org.apache.rya.api.client.RyaClientException; - /** * Integration tests the methods of {@link AccumuloCreatePCJ}. */ @@ -86,31 +85,32 @@ public class AccumuloDeletePCJIT extends FluoITBase { // Verify the correct results were exported. fluo.waitForObservers(); - final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME); - final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); + try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, RYA_INSTANCE_NAME)) { + final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) ); - final MapBindingSet bob = new MapBindingSet(); - bob.addBinding("x", vf.createURI("http://Bob")); + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("x", vf.createURI("http://Bob")); - final MapBindingSet charlie = new MapBindingSet(); - charlie.addBinding("x", vf.createURI("http://Charlie")); + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("x", vf.createURI("http://Charlie")); - final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie); - assertEquals(expected, results); + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(bob, charlie); + assertEquals(expected, results); - // Delete the PCJ. - final DeletePCJ deletePCJ = new AccumuloDeletePCJ(connectionDetails, accumuloConn); - deletePCJ.deletePCJ(RYA_INSTANCE_NAME, pcjId); + // Delete the PCJ. + final DeletePCJ deletePCJ = new AccumuloDeletePCJ(connectionDetails, accumuloConn); + deletePCJ.deletePCJ(RYA_INSTANCE_NAME, pcjId); - // Ensure the PCJ's metadata has been removed from the storage. - assertTrue( pcjStorage.listPcjs().isEmpty() ); + // Ensure the PCJ's metadata has been removed from the storage. + assertTrue( pcjStorage.listPcjs().isEmpty() ); - // Ensure the PCJ has been removed from the Fluo application. - fluo.waitForObservers(); + // Ensure the PCJ has been removed from the Fluo application. + fluo.waitForObservers(); - // Verify a Query ID was added for the query within the Fluo app. - fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient); - assertEquals(0, fluoQueryIds.size()); + // Verify a Query ID was added for the query within the Fluo app. + fluoQueryIds = new ListQueryIds().listQueryIds(fluoClient); + assertEquals(0, fluoQueryIds.size()); + } } @Test(expected = InstanceDoesNotExistException.class) http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java index c36cb2c..dd5fe68 100644 --- a/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java +++ b/extras/rya.benchmark/src/test/java/org/apache/rya/benchmark/query/QueryBenchmarkRunIT.java @@ -25,10 +25,19 @@ import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.minicluster.MiniAccumuloConfig; import org.apache.log4j.Level; import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; +import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun; import org.apache.rya.benchmark.query.QueryBenchmark.QueryBenchmarkRun.NotEnoughResultsException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; +import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.sail.config.RyaSailFactory; import org.apache.zookeeper.ClientCnxn; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -38,16 +47,6 @@ import org.openrdf.sail.Sail; import org.openrdf.sail.SailConnection; import org.openrdf.sail.SailException; -import org.apache.rya.accumulo.AccumuloRdfConfiguration; -import org.apache.rya.api.client.Install.InstallConfiguration; -import org.apache.rya.api.client.RyaClient; -import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails; -import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory; -import org.apache.rya.indexing.accumulo.ConfigUtils; -import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; -import org.apache.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; -import org.apache.rya.sail.config.RyaSailFactory; - /** * Integration tests {@link QueryBenchmarkRun}. */ @@ -145,12 +144,12 @@ public class QueryBenchmarkRunIT { private static void createTestPCJ(final RyaClient ryaClient) throws Exception { // Create an empty PCJ within the Rya instance's PCJ storage for the test query. - final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(cluster.getConnector(ACCUMULO_USER, ACCUMULO_PASSWORD), RYA_INSTANCE_NAME); - final String pcjId = pcjs.createPcj(SPARQL_QUERY); + try(final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(cluster.getConnector(ACCUMULO_USER, ACCUMULO_PASSWORD), RYA_INSTANCE_NAME)) { + final String pcjId = pcjs.createPcj(SPARQL_QUERY); - - // Batch update the PCJ using the Rya Client. - ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId); + // Batch update the PCJ using the Rya Client. + ryaClient.getBatchUpdatePCJ().batchUpdate(RYA_INSTANCE_NAME, pcjId); + } } @AfterClass http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java index 16653ee..38ae1b2 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/PrecomputedJoinStorage.java @@ -22,17 +22,17 @@ import java.util.Collection; import java.util.Iterator; import java.util.List; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.rya.indexing.pcj.storage.accumulo.VisibilityBindingSet; import org.openrdf.query.BindingSet; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * Functions that create and maintain the PCJ tables that are used by Rya. */ @DefaultAnnotation(NonNull.class) -public interface PrecomputedJoinStorage { +public interface PrecomputedJoinStorage extends AutoCloseable { /** * Get a list of all Precomputed Join indices that are being maintained. @@ -75,7 +75,7 @@ public interface PrecomputedJoinStorage { * results for the PCJ. * @throws PCJStorageException The scan couldn't be performed. */ - public Iterable<BindingSet> listResults(String pcjId) throws PCJStorageException; + public CloseableIterator<BindingSet> listResults(String pcjId) throws PCJStorageException; /** * Clears all values from a Precomputed Join index. The index will remain, @@ -94,15 +94,25 @@ public interface PrecomputedJoinStorage { */ public void dropPcj(final String pcjId) throws PCJStorageException; - /** * Releases and resources that are being used by the storage. * * @throws PCJStorageException Indicates the resources could not be released. */ + @Override public void close() throws PCJStorageException; /** + * An {@link Iterator} that also extends {@link AutoCloseable} because it has reference to resources + * that need to be released once you are done iterating. + * + * @param <T> The type of object that is iterated over. + */ + public static interface CloseableIterator<T> extends Iterator<T>, AutoCloseable { + + } + + /** * An operation of {@link PrecomputedJoinStorage} failed. */ public static class PCJStorageException extends PcjException { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java index 4769758..999b26f 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjSerializer.java @@ -34,7 +34,6 @@ import edu.umd.cs.findbugs.annotations.DefaultAnnotation; import edu.umd.cs.findbugs.annotations.NonNull; import org.openrdf.model.Value; -import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.algebra.evaluation.QueryBindingSet; @@ -57,7 +56,6 @@ public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> { public byte[] convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException { checkNotNull(bindingSet); checkNotNull(varOrder); - checkBindingsSubsetOfVarOrder(bindingSet, varOrder); // A list that holds all of the byte segments that will be concatenated at the end. // This minimizes byte[] construction. @@ -112,24 +110,6 @@ public class AccumuloPcjSerializer implements BindingSetConverter<byte[]> { } } - /** - * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet} - * are a subset of the variables names in {@link VariableOrder}. - * - * @param bindingSet - The binding set whose Bindings will be inspected. (not null) - * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null) - * @throws IllegalArgumentException Indicates the names of the bindings are - * not a subset of the variable order. - */ - private static void checkBindingsSubsetOfVarOrder(BindingSet bindingSet, VariableOrder varOrder) throws IllegalArgumentException { - checkNotNull(bindingSet); - checkNotNull(varOrder); - - Set<String> bindingNames = bindingSet.getBindingNames(); - List<String> varNames = varOrder.getVariableOrders(); - checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder."); - } - private static final byte[] concat(Iterable<byte[]> byteSegments) { checkNotNull(byteSegments); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java index 6024d12..b8974e6 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/AccumuloPcjStorage.java @@ -25,9 +25,6 @@ import java.util.Collection; import java.util.List; import java.util.Set; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Connector; @@ -47,6 +44,9 @@ import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; import org.openrdf.query.BindingSet; import org.openrdf.query.MalformedQueryException; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * An Accumulo backed implementation of {@link PrecomputedJoinStorage}. */ @@ -156,7 +156,7 @@ public class AccumuloPcjStorage implements PrecomputedJoinStorage { } @Override - public Iterable<BindingSet> listResults(final String pcjId) throws PCJStorageException { + public CloseableIterator<BindingSet> listResults(final String pcjId) throws PCJStorageException { requireNonNull(pcjId); try { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java index d2cf366..c920824 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetConverter.java @@ -18,12 +18,12 @@ */ package org.apache.rya.indexing.pcj.storage.accumulo; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * Converts {@link BindingSet}s into other representations. This library is * intended to convert between BindingSet and whatever format it is being @@ -52,8 +52,7 @@ public interface BindingSetConverter<T> { * resulting model. (not null) * @return The BindingSet formatted as the target model. * @throws BindingSetConversionException The BindingSet was unable to be - * converted into the target model. This will happen if the BindingSet has - * a binding whose name is not in the VariableOrder or if one of the values + * converted into the target model. This will happen if one of the values * could not be converted into the target model. */ public T convert(BindingSet bindingSet, VariableOrder varOrder) throws BindingSetConversionException; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java index b2d04e1..4120fd9 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/BindingSetStringConverter.java @@ -19,29 +19,27 @@ package org.apache.rya.indexing.pcj.storage.accumulo; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; import java.util.ArrayList; import java.util.List; -import java.util.Set; - -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; +import org.apache.rya.api.domain.RyaType; +import org.apache.rya.api.resolver.RdfToRyaConversions; import org.openrdf.model.URI; import org.openrdf.model.Value; import org.openrdf.model.ValueFactory; import org.openrdf.model.impl.URIImpl; import org.openrdf.model.impl.ValueFactoryImpl; import org.openrdf.model.vocabulary.XMLSchema; -import org.openrdf.query.Binding; import org.openrdf.query.BindingSet; import org.openrdf.query.algebra.evaluation.QueryBindingSet; +import org.openrdf.query.impl.MapBindingSet; import com.google.common.base.Joiner; -import org.apache.rya.api.domain.RyaType; -import org.apache.rya.api.resolver.RdfToRyaConversions; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Converts {@link BindingSet}s to Strings and back again. The Strings do not @@ -58,7 +56,8 @@ public class BindingSetStringConverter implements BindingSetConverter<String> { @Override public String convert(final BindingSet bindingSet, final VariableOrder varOrder) { - checkBindingsSubsetOfVarOrder(bindingSet, varOrder); + requireNonNull(bindingSet); + requireNonNull(varOrder); // Convert each Binding to a String. final List<String> bindingStrings = new ArrayList<>(); @@ -79,38 +78,26 @@ public class BindingSetStringConverter implements BindingSetConverter<String> { return Joiner.on(BINDING_DELIM).join(bindingStrings); } - /** - * Checks to see if the names of all the {@link Binding}s in the {@link BindingSet} - * are a subset of the variables names in {@link VariableOrder}. - * - * @param bindingSet - The binding set whose Bindings will be inspected. (not null) - * @param varOrder - The names of the bindings that may appear in the BindingSet. (not null) - * @throws IllegalArgumentException Indicates the names of the bindings are - * not a subset of the variable order. - */ - private static void checkBindingsSubsetOfVarOrder(final BindingSet bindingSet, final VariableOrder varOrder) throws IllegalArgumentException { - checkNotNull(bindingSet); - checkNotNull(varOrder); - - final Set<String> bindingNames = bindingSet.getBindingNames(); - final List<String> varNames = varOrder.getVariableOrders(); - checkArgument(varNames.containsAll(bindingNames), "The BindingSet contains a Binding whose name is not part of the VariableOrder."); - } - @Override public BindingSet convert(final String bindingSetString, final VariableOrder varOrder) { - checkNotNull(bindingSetString); - checkNotNull(varOrder); + requireNonNull(bindingSetString); + requireNonNull(varOrder); + + // If both are empty, return an empty binding set. + if(bindingSetString.isEmpty() && varOrder.toString().isEmpty()) { + return new MapBindingSet(); + } + // Otherwise parse it. final String[] bindingStrings = bindingSetString.split(BINDING_DELIM); - final String[] varOrrderArr = varOrder.toArray(); - checkArgument(varOrrderArr.length == bindingStrings.length, "The number of Bindings must match the length of the VariableOrder."); + final String[] varOrderArr = varOrder.toArray(); + checkArgument(varOrderArr.length == bindingStrings.length, "The number of Bindings must match the length of the VariableOrder."); final QueryBindingSet bindingSet = new QueryBindingSet(); for(int i = 0; i < bindingStrings.length; i++) { final String bindingString = bindingStrings[i]; if(!NULL_VALUE_STRING.equals(bindingString)) { - final String name = varOrrderArr[i]; + final String name = varOrderArr[i]; final Value value = toValue(bindingStrings[i]); bindingSet.addBinding(name, value); } @@ -125,7 +112,7 @@ public class BindingSetStringConverter implements BindingSetConverter<String> { * @return The {@link Value} representation of the String. */ protected static Value toValue(final String valueString) { - checkNotNull(valueString); + requireNonNull(valueString); // Split the String that was stored in Fluo into its Value and Type parts. final String[] valueAndType = valueString.split(TYPE_DELIM); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java index ce3e5d1..5d13597 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/PcjTables.java @@ -30,9 +30,6 @@ import java.util.List; import java.util.Map.Entry; import java.util.Set; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; @@ -59,6 +56,7 @@ import org.apache.accumulo.core.security.ColumnVisibility; import org.apache.hadoop.io.Text; import org.apache.log4j.Logger; import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.openrdf.query.BindingSet; @@ -72,6 +70,9 @@ import org.openrdf.repository.RepositoryException; import com.google.common.base.Optional; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * Functions that create and maintain the PCJ tables that are used by Rya. */ @@ -157,6 +158,7 @@ public class PcjTables { final TableOperations tableOps = accumuloConn.tableOperations(); if(!tableOps.exists(pcjTableName)) { + BatchWriter writer = null; try { // Create the new table in Accumulo. tableOps.create(pcjTableName); @@ -165,14 +167,21 @@ public class PcjTables { final PcjMetadata pcjMetadata = new PcjMetadata(sparql, 0L, varOrders); final List<Mutation> mutations = makeWriteMetadataMutations(pcjMetadata); - final BatchWriter writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig()); + writer = accumuloConn.createBatchWriter(pcjTableName, new BatchWriterConfig()); writer.addMutations(mutations); - writer.close(); } catch (final TableExistsException e) { log.warn("Something else just created the Rya PCJ export table named '" + pcjTableName + "'. This is unexpected, but we will continue as normal."); } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { throw new PCJStorageException("Could not create a new PCJ named: " + pcjTableName, e); + } finally { + if(writer != null) { + try { + writer.close(); + } catch (final MutationsRejectedException e) { + log.error("Mutations rejected while creating the PCJ table.", e); + } + } } } } @@ -231,9 +240,10 @@ public class PcjTables { checkNotNull(accumuloConn); checkNotNull(pcjTableName); + Scanner scanner = null; try { // Create an Accumulo scanner that iterates through the metadata entries. - final Scanner scanner = accumuloConn.createScanner(pcjTableName, new Authorizations()); + scanner = accumuloConn.createScanner(pcjTableName, new Authorizations()); final Iterator<Entry<Key, Value>> entries = scanner.iterator(); // No metadata has been stored in the table yet. @@ -266,6 +276,10 @@ public class PcjTables { } catch (final TableNotFoundException e) { throw new PCJStorageException("Could not add results to a PCJ because the PCJ table does not exist.", e); + } finally { + if(scanner != null) { + scanner.close(); + } } } @@ -310,7 +324,7 @@ public class PcjTables { * results for the PCJ. * @throws PCJStorageException The binding sets could not be fetched. */ - public Iterable<BindingSet> listResults(final Connector accumuloConn, final String pcjTableName, final Authorizations auths) throws PCJStorageException { + public CloseableIterator<BindingSet> listResults(final Connector accumuloConn, final String pcjTableName, final Authorizations auths) throws PCJStorageException { requireNonNull(pcjTableName); // Fetch the Variable Orders for the binding sets and choose one of them. It @@ -324,12 +338,7 @@ public class PcjTables { scanner.fetchColumnFamily( new Text(varOrder.toString()) ); // Return an Iterator that uses that scanner. - return new Iterable<BindingSet>() { - @Override - public Iterator<BindingSet> iterator() { - return new ScannerBindingSetIterator(scanner, varOrder); - } - }; + return new ScannerBindingSetIterator(scanner, varOrder); } catch (final TableNotFoundException e) { throw new PCJStorageException(String.format("PCJ Table does not exist for name '%s'.", pcjTableName), e); @@ -398,10 +407,10 @@ public class PcjTables { for(final VariableOrder varOrder : varOrders) { try { // Serialize the result to the variable order. - final byte[] serializedResult = converter.convert(result, varOrder); + final byte[] rowKey = converter.convert(result, varOrder); // Row ID = binding set values, Column Family = variable order of the binding set. - final Mutation addResult = new Mutation(serializedResult); + final Mutation addResult = new Mutation(rowKey); final String visibility = result.getVisibility(); addResult.put(varOrder.toString(), "", new ColumnVisibility(visibility), ""); mutations.add(addResult); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java index d0fd7bf..26fd8c9 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/ScannerBindingSetIterator.java @@ -20,27 +20,30 @@ package org.apache.rya.indexing.pcj.storage.accumulo; import static java.util.Objects.requireNonNull; +import java.io.IOException; import java.util.Iterator; import java.util.Map.Entry; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.CloseableIterator; import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; import org.openrdf.query.BindingSet; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * Iterates over the results of a {@link Scanner} assuming the results are * binding sets that can be converted using a {@link AccumuloPcjSerializer}. */ @DefaultAnnotation(NonNull.class) -public class ScannerBindingSetIterator implements Iterator<BindingSet> { +public class ScannerBindingSetIterator implements CloseableIterator<BindingSet> { private static final AccumuloPcjSerializer converter = new AccumuloPcjSerializer(); + private final Scanner scanner; private final Iterator<Entry<Key, Value>> accEntries; private final VariableOrder varOrder; @@ -51,7 +54,7 @@ public class ScannerBindingSetIterator implements Iterator<BindingSet> { * @param varOrder - The variable order of the binding sets the scanner returns. (not null) */ public ScannerBindingSetIterator(final Scanner scanner, final VariableOrder varOrder) { - requireNonNull(scanner); + this.scanner = requireNonNull(scanner); this.accEntries = scanner.iterator(); this.varOrder = requireNonNull(varOrder); } @@ -71,4 +74,9 @@ public class ScannerBindingSetIterator implements Iterator<BindingSet> { throw new RuntimeException("Could not deserialize a BindingSet from Accumulo.", e); } } + + @Override + public void close() throws IOException { + scanner.close(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java index 6ec801e..151db50 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/accumulo/VariableOrder.java @@ -23,15 +23,15 @@ import static com.google.common.base.Preconditions.checkNotNull; import java.util.Collection; import java.util.Iterator; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; -import net.jcip.annotations.Immutable; - import org.openrdf.query.BindingSet; import com.google.common.base.Joiner; import com.google.common.collect.ImmutableList; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; +import net.jcip.annotations.Immutable; + /** * An ordered list of {@link BindingSet} variable names. These are used to * specify the order {@link Binding}s within the set are serialized to Accumulo. @@ -46,6 +46,13 @@ public final class VariableOrder implements Iterable<String> { private final ImmutableList<String> variableOrder; /** + * Constructs an instance of {@link VariableOrder} when there are no variables. + */ + public VariableOrder() { + variableOrder = ImmutableList.of(); + } + + /** * Constructs an instance of {@link VariableOrder}. * * @param varOrder - An ordered array of Binding Set variables. (not null) http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c941aea8/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java index 2baa52e..f67110e 100644 --- a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/update/PrecomputedJoinUpdater.java @@ -20,19 +20,18 @@ package org.apache.rya.indexing.pcj.update; import java.util.Collection; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - +import org.apache.rya.api.domain.RyaStatement; import org.apache.rya.indexing.pcj.storage.PcjException; -import org.apache.rya.api.domain.RyaStatement; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; /** * Updates the state of all PCJ indices whenever {@link RyaStatement}s are * added to or removed from the system. */ @DefaultAnnotation(NonNull.class) -public interface PrecomputedJoinUpdater { +public interface PrecomputedJoinUpdater extends AutoCloseable { /** * The PCJ indices will be updated to include new statements within @@ -80,6 +79,7 @@ public interface PrecomputedJoinUpdater { * * @throws PcjUpdateException The updater could not be closed. */ + @Override public void close() throws PcjUpdateException; /**
