http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorage.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorage.java b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorage.java new file mode 100644 index 0000000..f4e4e9e --- /dev/null +++ b/extras/rya.indexing.pcj/src/main/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorage.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.mongo; + +import static java.util.Objects.requireNonNull; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import org.apache.rya.api.instance.RyaDetailsUpdater; +import org.apache.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.PCJIdFactory; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; +import org.openrdf.query.BindingSet; + +import com.mongodb.MongoClient; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * A mongo backed implementation of {@link PrecomputedJoinStorage}. + */ +@DefaultAnnotation(NonNull.class) +public class MongoPcjStorage implements PrecomputedJoinStorage { + public static final String PCJ_COLLECTION_NAME = "pcjs"; + // Used to update the instance's metadata. + private final MongoRyaInstanceDetailsRepository ryaDetailsRepo; + + private final String ryaInstanceName; + + // Factories that are used to create new PCJs. + private final PCJIdFactory pcjIdFactory = new PCJIdFactory(); + + private final MongoPcjDocuments pcjDocs; + + /** + * Constructs an instance of {@link MongoPcjStorage}. + * + * @param client - The {@link MongoClient} that will be used to connect to Mongodb. (not null) + * @param ryaInstanceName - The name of the RYA instance that will be accessed. (not null) + */ + public MongoPcjStorage(final MongoClient client, final String ryaInstanceName) { + requireNonNull(client); + this.ryaInstanceName = requireNonNull(ryaInstanceName); + pcjDocs = new MongoPcjDocuments(client, ryaInstanceName); + ryaDetailsRepo = new MongoRyaInstanceDetailsRepository(client, ryaInstanceName); + } + + @Override + public String createPcj(final String sparql) throws PCJStorageException { + requireNonNull(sparql); + + // Update the Rya Details for this instance to include the new PCJ + // table. + final String pcjId = pcjIdFactory.nextId(); + + try { + new RyaDetailsUpdater(ryaDetailsRepo).update(originalDetails -> { + // Create the new PCJ's details. + final PCJDetails.Builder newPcjDetails = PCJDetails.builder().setId(pcjId); + + // Add them to the instance's details. + final RyaDetails.Builder mutated = RyaDetails.builder(originalDetails); + mutated.getPCJIndexDetails().addPCJDetails(newPcjDetails); + return mutated.build(); + }); + } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new PCJStorageException(String.format("Could not create a new PCJ for Rya instance '%s' " + + "because of a problem while updating the instance's details.", ryaInstanceName), e); + } + + // Create the objectID of the document to house the PCJ results. + pcjDocs.createPcj(pcjId, sparql); + + // Add access to the PCJ table to all users who are authorized for this + // instance of Rya. + return pcjId; + } + + + @Override + public PcjMetadata getPcjMetadata(final String pcjId) throws PCJStorageException { + requireNonNull(pcjId); + return pcjDocs.getPcjMetadata(pcjId); + } + + @Override + public void addResults(final String pcjId, final Collection<VisibilityBindingSet> results) throws PCJStorageException { + requireNonNull(pcjId); + requireNonNull(results); + pcjDocs.addResults(pcjId, results); + } + + + @Override + public CloseableIterator<BindingSet> listResults(final String pcjId) throws PCJStorageException { + requireNonNull(pcjId); + // Scan the PCJ table. + return pcjDocs.listResults(pcjId); + } + + @Override + public void purge(final String pcjId) throws PCJStorageException { + requireNonNull(pcjId); + pcjDocs.purgePcjs(pcjId); + } + + @Override + public void dropPcj(final String pcjId) throws PCJStorageException { + requireNonNull(pcjId); + + // Update the Rya Details for this instance to no longer include the + // PCJ. + try { + new RyaDetailsUpdater(ryaDetailsRepo).update(originalDetails -> { + // Drop the PCJ's metadata from the instance's metadata. + final RyaDetails.Builder mutated = RyaDetails.builder(originalDetails); + mutated.getPCJIndexDetails().removePCJDetails(pcjId); + return mutated.build(); + }); + } catch (final RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new PCJStorageException(String.format("Could not drop an existing PCJ for Rya instance '%s' " + + "because of a problem while updating the instance's details.", ryaInstanceName), e); + } + + // Delete the table that hold's the PCJ's results. + pcjDocs.dropPcj(pcjId); + } + + @Override + public List<String> listPcjs() throws PCJStorageException { + try { + final RyaDetails details = ryaDetailsRepo.getRyaInstanceDetails(); + final PCJIndexDetails pcjIndexDetails = details.getPCJIndexDetails(); + final List<String> pcjIds = new ArrayList<>(pcjIndexDetails.getPCJDetails().keySet()); + return pcjIds; + } catch (final RyaDetailsRepositoryException e) { + throw new PCJStorageException("Could not check to see if RyaDetails exist for the instance.", e); + } + } + + @Override + public void close() throws PCJStorageException { + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocumentsTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocumentsTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocumentsTest.java new file mode 100644 index 0000000..f522fac --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjDocumentsTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.mongo; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.mongodb.MongoITBase; +import org.bson.Document; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + +public class MongoPcjDocumentsTest extends MongoITBase { + @Test + public void pcjToMetadata() throws Exception { + final MongoPcjDocuments docConverter = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final Document actual = docConverter.makeMetadataDocument("pcjTest", sparql); + final Document expected = new Document() + .append(MongoPcjDocuments.CARDINALITY_FIELD, 0) + .append(MongoPcjDocuments.PCJ_METADATA_ID, "pcjTest_METADATA") + .append(MongoPcjDocuments.SPARQL_FIELD, sparql) + .append(MongoPcjDocuments.VAR_ORDER_FIELD, Sets.newHashSet(new VariableOrder("a", "b"), new VariableOrder("b", "a"))); + assertEquals(expected, actual); + } + + @Test + public void metadataExists() throws Exception { + final List<VariableOrder> varOrders = Lists.newArrayList(new VariableOrder("b", "a"), new VariableOrder("a", "b")); + final MongoPcjDocuments docConverter = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + docConverter.createPcj("pcjTest", sparql); + + PcjMetadata actual = docConverter.getPcjMetadata("pcjTest"); + PcjMetadata expected = new PcjMetadata(sparql, 0, varOrders); + assertEquals(expected, actual); + + // Setup the binding set that will be converted. + final MapBindingSet originalBindingSet1 = new MapBindingSet(); + originalBindingSet1.addBinding("x", new URIImpl("http://a")); + originalBindingSet1.addBinding("y", new URIImpl("http://b")); + originalBindingSet1.addBinding("z", new URIImpl("http://c")); + final VisibilityBindingSet results1 = new VisibilityBindingSet(originalBindingSet1, "A&B&C"); + + // Setup the binding set that will be converted. + final MapBindingSet originalBindingSet2 = new MapBindingSet(); + originalBindingSet2.addBinding("x", new URIImpl("http://1")); + originalBindingSet2.addBinding("y", new URIImpl("http://2")); + originalBindingSet2.addBinding("z", new URIImpl("http://3")); + final VisibilityBindingSet results2 = new VisibilityBindingSet(originalBindingSet2, "A&B&C"); + + final List<VisibilityBindingSet> bindingSets = new ArrayList<>(); + bindingSets.add(results1); + bindingSets.add(results2); + + docConverter.addResults("pcjTest", bindingSets); + actual = docConverter.getPcjMetadata("pcjTest"); + expected = new PcjMetadata(sparql, 2, varOrders); + assertEquals(expected, actual); + + docConverter.purgePcjs("pcjTest"); + actual = docConverter.getPcjMetadata("pcjTest"); + expected = new PcjMetadata(sparql, 0, varOrders); + assertEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorageIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorageIT.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorageIT.java new file mode 100644 index 0000000..6747558 --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/MongoPcjStorageIT.java @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.mongo; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.rya.api.instance.RyaDetails; +import org.apache.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import org.apache.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import org.apache.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails; +import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import org.apache.rya.api.instance.RyaDetails.ProspectorDetails; +import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.apache.rya.indexing.pcj.storage.accumulo.ShiftVarOrderFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.mongodb.MongoITBase; +import org.apache.rya.mongodb.instance.MongoRyaInstanceDetailsRepository; +import org.junit.Test; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; + +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; + +/** + * Integration tests the methods of {@link AccumuloPcjStorage}. + * </p> + * These tests ensures that the PCJ tables are maintained and that these operations + * also update the Rya instance's details. + */ +public class MongoPcjStorageIT extends MongoITBase { + + @Test + public void createPCJ() throws Exception { + // Setup the PCJ storage that will be tested against. + try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) { + // Create a PCJ. + final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName()); + detailsRepo.initialize( + RyaDetails.builder() + .setRyaInstanceName(conf.getRyaInstanceName()) + .setRyaVersion("test") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false)) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(false)) + .setProspectorDetails(new ProspectorDetails(Optional.absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent())) + .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true)) + .build() + ); + final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + + // Ensure the Rya details have been updated to include the PCJ's ID. + final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails() + .getPCJIndexDetails() + .getPCJDetails(); + + final PCJDetails expectedDetails = PCJDetails.builder() + .setId( pcjId ) + .build(); + + assertEquals(expectedDetails, detailsMap.get(pcjId)); + } + } + + + @Test + public void dropPCJ() throws Exception { + try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) { + final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName()); + detailsRepo.initialize( + RyaDetails.builder() + .setRyaInstanceName(conf.getRyaInstanceName()) + .setRyaVersion("test") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false)) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(false)) + .setProspectorDetails(new ProspectorDetails(Optional.absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent())) + .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true)) + .build() + ); + // Create a PCJ. + final String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + + // Delete the PCJ that was just created. + pcjStorage.dropPcj(pcjId); + + // Ensure the Rya details have been updated to no longer include the PCJ's ID. + + final ImmutableMap<String, PCJDetails> detailsMap = detailsRepo.getRyaInstanceDetails() + .getPCJIndexDetails() + .getPCJDetails(); + + assertFalse( detailsMap.containsKey(pcjId) ); + } + } + + @Test + public void listPcjs() throws Exception { + try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) { + final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName()); + detailsRepo.initialize( + RyaDetails.builder() + .setRyaInstanceName(conf.getRyaInstanceName()) + .setRyaVersion("test") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false)) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(false)) + .setProspectorDetails(new ProspectorDetails(Optional.absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent())) + .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true)) + .build() + ); + // Create a few PCJs and hold onto their IDs. + final List<String> expectedIds = new ArrayList<>(); + + String pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + expectedIds.add( pcjId ); + + pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + expectedIds.add( pcjId ); + + pcjId = pcjStorage.createPcj("SELECT * WHERE { ?a <http://isA> ?b } "); + expectedIds.add( pcjId ); + + // Fetch the PCJ names + final List<String> pcjIds = pcjStorage.listPcjs(); + + // Ensure the expected IDs match the fetched IDs. + Collections.sort(expectedIds); + Collections.sort(pcjIds); + assertEquals(expectedIds, pcjIds); + } + } + + @Test + public void getPcjMetadata() throws Exception { + try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) { + final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName()); + detailsRepo.initialize( + RyaDetails.builder() + .setRyaInstanceName(conf.getRyaInstanceName()) + .setRyaVersion("test") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false)) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(false)) + .setProspectorDetails(new ProspectorDetails(Optional.absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent())) + .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true)) + .build() + ); + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Fetch the PCJ's metadata. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + + // Ensure it has the expected values. + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); + assertEquals(expectedMetadata, metadata); + } + } + + @Test + public void addResults() throws Exception { + try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) { + final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName()); + detailsRepo.initialize( + RyaDetails.builder() + .setRyaInstanceName(conf.getRyaInstanceName()) + .setRyaVersion("test") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false)) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(false)) + .setProspectorDetails(new ProspectorDetails(Optional.absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent())) + .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true)) + .build() + ); + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Add some binding sets to it. + final Set<VisibilityBindingSet> results = new HashSet<>(); + + final MapBindingSet aliceBS = new MapBindingSet(); + aliceBS.addBinding("a", new URIImpl("http://Alice")); + aliceBS.addBinding("b", new URIImpl("http://Person")); + results.add( new VisibilityBindingSet(aliceBS, "") ); + + final MapBindingSet charlieBS = new MapBindingSet(); + charlieBS.addBinding("a", new URIImpl("http://Charlie")); + charlieBS.addBinding("b", new URIImpl("http://Comedian")); + results.add( new VisibilityBindingSet(charlieBS, "") ); + + pcjStorage.addResults(pcjId, results); + + // Make sure the PCJ metadata was updated. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 2L, varOrders); + assertEquals(expectedMetadata, metadata); + } + } + + @Test + public void listResults() throws Exception { + try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) { + final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName()); + detailsRepo.initialize( + RyaDetails.builder() + .setRyaInstanceName(conf.getRyaInstanceName()) + .setRyaVersion("test") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false)) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(false)) + .setProspectorDetails(new ProspectorDetails(Optional.absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent())) + .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true)) + .build() + ); + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Add some binding sets to it. + final Set<VisibilityBindingSet> visiSets = new HashSet<>(); + final Set<BindingSet> expectedResults = new HashSet<>(); + + final MapBindingSet aliceBS = new MapBindingSet(); + aliceBS.addBinding("a", new URIImpl("http://Alice")); + aliceBS.addBinding("b", new URIImpl("http://Person")); + visiSets.add( new VisibilityBindingSet(aliceBS, "") ); + expectedResults.add(aliceBS); + + final MapBindingSet charlieBS = new MapBindingSet(); + charlieBS.addBinding("a", new URIImpl("http://Charlie")); + charlieBS.addBinding("b", new URIImpl("http://Comedian")); + visiSets.add( new VisibilityBindingSet(charlieBS, "") ); + expectedResults.add(charlieBS); + + pcjStorage.addResults(pcjId, visiSets); + + // List the results that were stored. + final Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } + + assertEquals(expectedResults, results); + } + } + + @Test + public void purge() throws Exception { + try(final PrecomputedJoinStorage pcjStorage = new MongoPcjStorage(getMongoClient(), conf.getRyaInstanceName())) { + final MongoRyaInstanceDetailsRepository detailsRepo = new MongoRyaInstanceDetailsRepository(getMongoClient(), conf.getRyaInstanceName()); + detailsRepo.initialize( + RyaDetails.builder() + .setRyaInstanceName(conf.getRyaInstanceName()) + .setRyaVersion("test") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(false)) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(false)) + .setProspectorDetails(new ProspectorDetails(Optional.absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.absent())) + .setPCJIndexDetails(PCJIndexDetails.builder().setEnabled(true)) + .build() + ); + // Create a PCJ. + final String sparql = "SELECT * WHERE { ?a <http://isA> ?b }"; + final String pcjId = pcjStorage.createPcj(sparql); + + // Add some binding sets to it. + final Set<VisibilityBindingSet> expectedResults = new HashSet<>(); + + final MapBindingSet aliceBS = new MapBindingSet(); + aliceBS.addBinding("a", new URIImpl("http://Alice")); + aliceBS.addBinding("b", new URIImpl("http://Person")); + expectedResults.add( new VisibilityBindingSet(aliceBS, "") ); + + final MapBindingSet charlieBS = new MapBindingSet(); + charlieBS.addBinding("a", new URIImpl("http://Charlie")); + charlieBS.addBinding("b", new URIImpl("http://Comedian")); + expectedResults.add( new VisibilityBindingSet(charlieBS, "") ); + + pcjStorage.addResults(pcjId, expectedResults); + + // Purge the PCJ. + pcjStorage.purge(pcjId); + + // List the results that were stored. + final Set<BindingSet> results = new HashSet<>(); + try(CloseableIterator<BindingSet> resultsIt = pcjStorage.listResults(pcjId)) { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } + + assertTrue( results.isEmpty() ); + + // Make sure the PCJ metadata was updated. + final PcjMetadata metadata = pcjStorage.getPcjMetadata(pcjId); + + final Set<VariableOrder> varOrders = new ShiftVarOrderFactory().makeVarOrders(sparql); + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, varOrders); + assertEquals(expectedMetadata, metadata); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsIntegrationTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsIntegrationTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsIntegrationTest.java new file mode 100644 index 0000000..0c71c9f --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsIntegrationTest.java @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.pcj.storage.mongo; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.rya.api.model.VisibilityBindingSet; +import org.apache.rya.api.utils.CloseableIterator; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.BindingSetConverter.BindingSetConversionException; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTables; +import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoDBRyaDAO; +import org.apache.rya.mongodb.MongoITBase; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.RdfCloudTripleStore; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.NumericLiteralImpl; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.query.BindingSet; +import org.openrdf.query.impl.MapBindingSet; +import org.openrdf.repository.RepositoryConnection; +import org.openrdf.repository.sail.SailRepositoryConnection; + +import com.google.common.base.Optional; +import com.google.common.collect.Sets; + +/** + * Performs integration test using {@link MiniAccumuloCluster} to ensure the + * functions of {@link PcjTables} work within a cluster setting. + */ +public class PcjDocumentsIntegrationTest extends MongoITBase { + @Override + protected void updateConfiguration(final MongoDBRdfConfiguration conf) { + conf.setDisplayQueryPlan(true); + } + + /** + * Ensure that when a new PCJ table is created, it is initialized with the + * correct metadata values. + * <p> + * The method being tested is {@link PcjTables#createPcjTable(Connector, String, Set, String)} + */ + @Test + public void createPcjTable() throws PcjException, AccumuloException, AccumuloSecurityException { + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = "testPcj"; + final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + pcjs.createPcj(pcjTableName, sparql); + + // Fetch the PcjMetadata and ensure it has the correct values. + final PcjMetadata pcjMetadata = pcjs.getPcjMetadata(pcjTableName); + + // Ensure the metadata matches the expected value. + final PcjMetadata expected = new PcjMetadata(sparql, 0L, Sets.newHashSet(new VariableOrder("name", "age"), new VariableOrder("age", "name"))); + assertEquals(expected, pcjMetadata); + } + + /** + * Ensure when results have been written to the PCJ table that they are in Accumulo. + * <p> + * The method being tested is {@link PcjTables#addResults(Connector, String, java.util.Collection)} + */ + @Test + public void addResults() throws Exception { + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = "testPcj"; + final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + pcjs.createPcj(pcjTableName, sparql); + + // Add a few results to the PCJ table. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie); + pcjs.addResults(pcjTableName, Sets.<VisibilityBindingSet>newHashSet( + new VisibilityBindingSet(alice), + new VisibilityBindingSet(bob), + new VisibilityBindingSet(charlie))); + + // Make sure the cardinality was updated. + final PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName); + assertEquals(3, metadata.getCardinality()); + + // Scan Accumulo for the stored results. + final Collection<BindingSet> fetchedResults = loadPcjResults(pcjTableName); + assertEquals(expected, fetchedResults); + } + + @Test + public void listResults() throws Exception { + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = "testPcj"; + final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + pcjs.createPcj(pcjTableName, sparql); + + // Add a few results to the PCJ table. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + pcjs.addResults(pcjTableName, Sets.<VisibilityBindingSet>newHashSet( + new VisibilityBindingSet(alice), + new VisibilityBindingSet(bob), + new VisibilityBindingSet(charlie))); + + // Fetch the Binding Sets that have been stored in the PCJ table. + final Set<BindingSet> results = new HashSet<>(); + + final CloseableIterator<BindingSet> resultsIt = pcjs.listResults(pcjTableName); + try { + while(resultsIt.hasNext()) { + results.add( resultsIt.next() ); + } + } finally { + resultsIt.close(); + } + + // Verify the fetched results match the expected ones. + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie); + assertEquals(expected, results); + } + + /** + * Ensure when results are already stored in Rya, that we are able to populate + * the PCJ table for a new SPARQL query using those results. + * <p> + * The method being tested is: {@link PcjTables#populatePcj(Connector, String, RepositoryConnection, String)} + */ + @Test + public void populatePcj() throws Exception { + final MongoDBRyaDAO dao = new MongoDBRyaDAO(); + dao.setConf(new StatefulMongoDBRdfConfiguration(conf, getMongoClient())); + dao.init(); + final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); + ryaStore.setRyaDAO(dao); + ryaStore.initialize(); + final SailRepositoryConnection ryaConn = new RyaSailRepository(ryaStore).getConnection(); + ryaConn.begin(); + + try { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table that will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = "testPcj"; + final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + pcjs.createPcj(pcjTableName, sparql); + + // Populate the PCJ table using a Rya connection. + pcjs.populatePcj(pcjTableName, ryaConn); + + final Collection<BindingSet> fetchedResults = loadPcjResults(pcjTableName); + + // Make sure the cardinality was updated. + final PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName); + assertEquals(3, metadata.getCardinality()); + + // Ensure the expected results match those that were stored. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie); + + assertEquals(expected, fetchedResults); + } finally { + ryaConn.close(); + ryaStore.shutDown(); + } + } + + /** + * Ensure the method that creates a new PCJ table, scans Rya for matches, and + * stores them in the PCJ table works. + * <p> + * The method being tested is: {@link PcjTables#createAndPopulatePcj(RepositoryConnection, Connector, String, String, String[], Optional)} + */ + @Test + public void createAndPopulatePcj() throws Exception { + final MongoDBRyaDAO dao = new MongoDBRyaDAO(); + dao.setConf(new StatefulMongoDBRdfConfiguration(conf, getMongoClient())); + dao.init(); + final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); + ryaStore.setRyaDAO(dao); + ryaStore.initialize(); + final SailRepositoryConnection ryaConn = new RyaSailRepository(ryaStore).getConnection(); + ryaConn.begin(); + + try { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table that will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = "testPcj"; + + // Create and populate the PCJ table. + final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + pcjs.createAndPopulatePcj(ryaConn, pcjTableName, sparql); + + // Make sure the cardinality was updated. + final PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName); + assertEquals(3, metadata.getCardinality()); + + // Scan Accumulo for the stored results. + final Collection<BindingSet> fetchedResults = loadPcjResults(pcjTableName); + + // Ensure the expected results match those that were stored. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + final Set<BindingSet> expected = Sets.<BindingSet>newHashSet(alice, bob, charlie); + + assertEquals(expected, fetchedResults); + } finally { + ryaConn.close(); + ryaStore.shutDown(); + } + } + + @Test + public void listPcjs() throws Exception { + // Set up the table names that will be used. + final String instance1 = "instance1_"; + final String instance2 = "instance2_"; + + final String instance1_table1 = new PcjTableNameFactory().makeTableName(instance1, "table1"); + final String instance1_table2 = new PcjTableNameFactory().makeTableName(instance1, "table2"); + final String instance1_table3 = new PcjTableNameFactory().makeTableName(instance1, "table3"); + + final String instance2_table1 = new PcjTableNameFactory().makeTableName(instance2, "table1"); + + // Create the PCJ Tables that are in instance 1 and instance 2. + final String sparql = "SELECT ?x WHERE { ?x <http://isA> <http://Food> }"; + + final MongoPcjDocuments pcjs1 = new MongoPcjDocuments(getMongoClient(), instance1); + final MongoPcjDocuments pcjs2 = new MongoPcjDocuments(getMongoClient(), instance2); + pcjs1.createPcj(instance1_table1, sparql); + pcjs1.createPcj(instance1_table2, sparql); + pcjs1.createPcj(instance1_table3, sparql); + + pcjs2.createPcj(instance2_table1, sparql); + + // Ensure all of the names have been stored for instance 1 and 2. + final Set<String> expected1 = Sets.newHashSet(instance1_table1, instance1_table2, instance1_table3); + final Set<String> instance1Tables = Sets.newHashSet( pcjs1.listPcjDocuments() ); + assertEquals(expected1, instance1Tables); + + final Set<String> expected2 = Sets.newHashSet(instance2_table1); + final Set<String> instance2Tables = Sets.newHashSet( pcjs2.listPcjDocuments() ); + assertEquals(expected2, instance2Tables); + } + + @Test + public void purge() throws Exception { + final String sparql = + "SELECT ?name ?age " + + "{" + + "FILTER(?age < 30) ." + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = "testPcj"; + final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + pcjs.createPcj(pcjTableName, sparql); + + // Add a few results to the PCJ table. + final MapBindingSet alice = new MapBindingSet(); + alice.addBinding("name", new URIImpl("http://Alice")); + alice.addBinding("age", new NumericLiteralImpl(14, XMLSchema.INTEGER)); + + final MapBindingSet bob = new MapBindingSet(); + bob.addBinding("name", new URIImpl("http://Bob")); + bob.addBinding("age", new NumericLiteralImpl(16, XMLSchema.INTEGER)); + + final MapBindingSet charlie = new MapBindingSet(); + charlie.addBinding("name", new URIImpl("http://Charlie")); + charlie.addBinding("age", new NumericLiteralImpl(12, XMLSchema.INTEGER)); + + pcjs.addResults(pcjTableName, Sets.<VisibilityBindingSet>newHashSet( + new VisibilityBindingSet(alice), + new VisibilityBindingSet(bob), + new VisibilityBindingSet(charlie))); + + // Make sure the cardinality was updated. + PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName); + assertEquals(3, metadata.getCardinality()); + + // Purge the data. + pcjs.purgePcjs(pcjTableName); + + // Make sure the cardinality was updated to 0. + metadata = pcjs.getPcjMetadata(pcjTableName); + assertEquals(0, metadata.getCardinality()); + } + + @Test(expected=PCJStorageException.class) + public void dropPcj() throws Exception { + // Create a PCJ index. + final String pcjTableName = "testPcj"; + final String sparql = "SELECT x WHERE ?x <http://isA> <http://Food>"; + + final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + pcjs.createPcj(pcjTableName, sparql); + + // Fetch its metadata to show that it has actually been created. + final PcjMetadata expectedMetadata = new PcjMetadata(sparql, 0L, new ArrayList<VariableOrder>()); + PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName); + assertEquals(expectedMetadata, metadata); + + // Drop it. + pcjs.dropPcj(pcjTableName); + + // Show the metadata is no longer present. + metadata = pcjs.getPcjMetadata(pcjTableName); + } + + private Collection<BindingSet> loadPcjResults(final String pcjTableName) throws PcjException, TableNotFoundException, BindingSetConversionException { + + // Get the variable orders the data was written to. + final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + final CloseableIterator<BindingSet> bindings = pcjs.listResults(pcjTableName); + final Set<BindingSet> bindingSets = new HashSet<>(); + while(bindings.hasNext()) { + bindingSets.add(bindings.next()); + } + return bindingSets; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsWithMockTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsWithMockTest.java b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsWithMockTest.java new file mode 100644 index 0000000..a3ba747 --- /dev/null +++ b/extras/rya.indexing.pcj/src/test/java/org/apache/rya/indexing/pcj/storage/mongo/PcjDocumentsWithMockTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.rya.indexing.pcj.storage.mongo; + +import static org.junit.Assert.assertEquals; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.rya.indexing.pcj.storage.PcjMetadata; +import org.apache.rya.indexing.pcj.storage.accumulo.PcjTableNameFactory; +import org.apache.rya.mongodb.MongoDBRdfConfiguration; +import org.apache.rya.mongodb.MongoDBRyaDAO; +import org.apache.rya.mongodb.MongoITBase; +import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration; +import org.apache.rya.rdftriplestore.RdfCloudTripleStore; +import org.apache.rya.rdftriplestore.RyaSailRepository; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.impl.LiteralImpl; +import org.openrdf.model.impl.NumericLiteralImpl; +import org.openrdf.model.impl.StatementImpl; +import org.openrdf.model.impl.URIImpl; +import org.openrdf.model.vocabulary.XMLSchema; +import org.openrdf.repository.sail.SailRepositoryConnection; + +public class PcjDocumentsWithMockTest extends MongoITBase { + @Override + protected void updateConfiguration(final MongoDBRdfConfiguration conf) { + conf.setDisplayQueryPlan(false); + } + + @Test + public void populatePcj() throws Exception { + final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); + final MongoDBRyaDAO dao = new MongoDBRyaDAO(); + dao.setConf(new StatefulMongoDBRdfConfiguration(conf, getMongoClient())); + dao.init(); + ryaStore.setRyaDAO(dao); + ryaStore.initialize(); + final SailRepositoryConnection ryaConn = new RyaSailRepository(ryaStore).getConnection(); + + try { + // Load some Triples into Rya. + final Set<Statement> triples = new HashSet<>(); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://hasAge"), new NumericLiteralImpl(14, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Alice"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://hasAge"), new NumericLiteralImpl(16, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Bob"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://hasAge"), new NumericLiteralImpl(12, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Charlie"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://hasAge"), new NumericLiteralImpl(43, XMLSchema.INTEGER)) ); + triples.add( new StatementImpl(new URIImpl("http://Eve"), new URIImpl("http://playsSport"), new LiteralImpl("Soccer")) ); + + for(final Statement triple : triples) { + ryaConn.add(triple); + } + + // Create a PCJ table that will include those triples in its results. + final String sparql = + "SELECT ?name ?age " + + "{" + + "?name <http://hasAge> ?age." + + "?name <http://playsSport> \"Soccer\" " + + "}"; + + final String pcjTableName = new PcjTableNameFactory().makeTableName(conf.getRyaInstanceName(), "testPcj"); + final MongoPcjDocuments pcjs = new MongoPcjDocuments(getMongoClient(), conf.getRyaInstanceName()); + pcjs.createAndPopulatePcj(ryaConn, pcjTableName, sparql); + + // Make sure the cardinality was updated. + final PcjMetadata metadata = pcjs.getPcjMetadata(pcjTableName); + assertEquals(4, metadata.getCardinality()); + } finally { + ryaConn.close(); + ryaStore.shutDown(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java index 7d6b241..ef5ab34 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/CreateDeleteIT.java @@ -132,7 +132,7 @@ public class CreateDeleteIT extends RyaExportITBase { // Register the PCJ with Rya. final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), getAccumuloConnector()); - final String pcjId = ryaClient.getCreatePCJ().get().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet()); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql, Sets.newHashSet()); // Write the data to Rya. final SailRepositoryConnection ryaConn = getRyaSailRepository().getConnection(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java index 610f502..11415eb 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/integration/QueryIT.java @@ -556,7 +556,7 @@ public class QueryIT extends RyaExportITBase { final Set<BindingSet> expectedResults = new HashSet<>(); final long period = 1800000; - final long binId = (currentTime / period) * period; + final long binId = currentTime / period * period; MapBindingSet bs = new MapBindingSet(); bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING)); @@ -657,7 +657,7 @@ public class QueryIT extends RyaExportITBase { final Set<BindingSet> expectedResults = new HashSet<>(); final long period = 1800000; - final long binId = (currentTime / period) * period; + final long binId = currentTime / period * period; MapBindingSet bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("4", XMLSchema.INTEGER)); @@ -734,7 +734,7 @@ public class QueryIT extends RyaExportITBase { final Set<BindingSet> expectedResults = new HashSet<>(); final long period = 1800000; - final long binId = (currentTime / period) * period; + final long binId = currentTime / period * period; MapBindingSet bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); @@ -854,7 +854,7 @@ public class QueryIT extends RyaExportITBase { final Set<BindingSet> expectedResults = new HashSet<>(); final long period = 1800000; - final long binId = (currentTime / period) * period; + final long binId = currentTime / period * period; MapBindingSet bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); @@ -935,7 +935,7 @@ public class QueryIT extends RyaExportITBase { final Set<BindingSet> expectedResults = new HashSet<>(); final long period = 1800000; - final long binId = (currentTime / period) * period; + final long binId = currentTime / period * period; MapBindingSet bs = new MapBindingSet(); bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER)); @@ -996,7 +996,7 @@ public class QueryIT extends RyaExportITBase { switch (strategy) { case RYA: - ryaClient.getCreatePCJ().get().createPCJ(getRyaInstanceName(), sparql); + ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); addStatementsAndWait(statements); // Fetch the value that is stored within the PCJ table. try (final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName())) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java index 90ed01a..45be971 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java +++ b/extras/rya.pcj.fluo/pcj.fluo.integration/src/test/java/org/apache/rya/indexing/pcj/fluo/visibility/PcjVisibilityIT.java @@ -104,7 +104,7 @@ public class PcjVisibilityIT extends RyaExportITBase { final RyaClient ryaClient = AccumuloRyaClientFactory.build(createConnectionDetails(), accumuloConn); - final String pcjId = ryaClient.getCreatePCJ().get().createPCJ(getRyaInstanceName(), sparql); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); // Grant the root user the "u" authorization. super.getAccumuloConnector().securityOperations().changeUserAuthorizations(getUsername(), new Authorizations("u")); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java index 3b1b160..465e089 100644 --- a/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java +++ b/extras/rya.pcj.fluo/pcj.fluo.test.base/src/main/java/org/apache/rya/pcj/fluo/test/base/KafkaExportITBase.java @@ -342,7 +342,7 @@ public class KafkaExportITBase extends AccumuloExportITBase { final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(ACCUMULO_USER, ACCUMULO_PASSWORD.toCharArray(), accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn); - final String pcjId = ryaClient.getCreatePCJ().get().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA)); + final String pcjId = ryaClient.getCreatePCJ().createPCJ(RYA_INSTANCE_NAME, sparql, Sets.newHashSet(ExportStrategy.KAFKA)); loadData(statements); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java index 894421a..f540a2e 100644 --- a/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java +++ b/extras/rya.pcj.fluo/rya.pcj.functions.geo/src/test/java/org/apache/rya/indexing/pcj/functions/geo/GeoFunctionsIT.java @@ -342,7 +342,7 @@ public class GeoFunctionsIT extends RyaExportITBase { accInstance.getInstanceName(), accInstance.getZooKeepers()), accumuloConn); - ryaClient.getCreatePCJ().get().createPCJ(getRyaInstanceName(), sparql); + ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql); // Write the data to Rya. final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java index ef33df1..17a2a23 100644 --- a/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java +++ b/extras/shell/src/main/java/org/apache/rya/shell/RyaAdminCommands.java @@ -166,11 +166,34 @@ public class RyaAdminCommands implements CommandMarker { */ @CliAvailabilityIndicator({ CREATE_PCJ_CMD, - DELETE_PCJ_CMD, + DELETE_PCJ_CMD}) + public boolean arePCJCommandsAvailable() { + // The PCJ commands are only available if the Shell is connected to an instance of Rya + // that is new enough to use the RyaDetailsRepository and is configured to maintain PCJs. + final ShellState shellState = state.getShellState(); + if(shellState.getConnectionState() == ConnectionState.CONNECTED_TO_INSTANCE) { + final GetInstanceDetails getInstanceDetails = shellState.getConnectedCommands().get().getGetInstanceDetails(); + final String ryaInstanceName = state.getShellState().getRyaInstanceName().get(); + try { + final Optional<RyaDetails> instanceDetails = getInstanceDetails.getDetails( ryaInstanceName ); + if(instanceDetails.isPresent()) { + return instanceDetails.get().getPCJIndexDetails().isEnabled(); + } + } catch (final RyaClientException e) { + return false; + } + } + return false; + } + + /** + * Enables commands that are available when the Shell is connected to a Rya Instance that supports PCJ Indexing. + */ + @CliAvailabilityIndicator({ CREATE_PERIODIC_PCJ_CMD, DELETE_PERIODIC_PCJ_CMD, LIST_INCREMENTAL_QUERIES}) - public boolean arePCJCommandsAvailable() { + public boolean arePeriodicPCJCommandsAvailable() { // The PCJ commands are only available if the Shell is connected to an instance of Rya // that is new enough to use the RyaDetailsRepository and is configured to maintain PCJs. final ShellState shellState = state.getShellState(); @@ -267,8 +290,8 @@ public class RyaAdminCommands implements CommandMarker { final boolean enableFreeTextIndex, // TODO RYA-215 -// @CliOption(key = {"enableGeospatialIndex"}, mandatory = false, help = "Use Geospatial Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true") -// final boolean enableGeospatialIndex, + // @CliOption(key = {"enableGeospatialIndex"}, mandatory = false, help = "Use Geospatial Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true") + // final boolean enableGeospatialIndex, @CliOption(key = {"enableTemporalIndex"}, mandatory = false, help = "Use Temporal Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true") final boolean enableTemporalIndex, @@ -289,7 +312,7 @@ public class RyaAdminCommands implements CommandMarker { .setEnableEntityCentricIndex(enableEntityCentricIndex) .setEnableFreeTextIndex(enableFreeTextIndex) // TODO RYA-215 -// .setEnableGeoIndex(enableGeospatialIndex) + // .setEnableGeoIndex(enableGeospatialIndex) .setEnableTemporalIndex(enableTemporalIndex) .setEnablePcjIndex(enablePcjIndex) .setFluoPcjAppName(fluoPcjAppName) @@ -320,11 +343,14 @@ public class RyaAdminCommands implements CommandMarker { final boolean enableFreeTextIndex, // TODO RYA-215 -// @CliOption(key = {"enableGeospatialIndex"}, mandatory = false, help = "Use Geospatial Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true") -// final boolean enableGeospatialIndex, + // @CliOption(key = {"enableGeospatialIndex"}, mandatory = false, help = "Use Geospatial Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true") + // final boolean enableGeospatialIndex, @CliOption(key = {"enableTemporalIndex"}, mandatory = false, help = "Use Temporal Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true") - final boolean enableTemporalIndex) { + final boolean enableTemporalIndex, + + @CliOption(key = {"enablePcjIndex"}, mandatory = false, help = "Use Precomputed Join (PCJ) Indexing.", unspecifiedDefaultValue = "false", specifiedDefaultValue = "true") + final boolean enablePcjIndex) { // Fetch the commands that are connected to the store. final RyaClient commands = state.getShellState().getConnectedCommands().get(); @@ -333,8 +359,9 @@ public class RyaAdminCommands implements CommandMarker { final InstallConfiguration installConfig = InstallConfiguration.builder() .setEnableFreeTextIndex(enableFreeTextIndex) // TODO RYA-215 -// .setEnableGeoIndex(enableGeospatialIndex) + // .setEnableGeoIndex(enableGeospatialIndex) .setEnableTemporalIndex(enableTemporalIndex) + .setEnablePcjIndex(enablePcjIndex) .build(); // Verify the configuration is what the user actually wants to do. @@ -401,7 +428,7 @@ public class RyaAdminCommands implements CommandMarker { final Optional<String> sparql = sparqlPrompt.getSparql(); if (sparql.isPresent()) { // Execute the command. - final String pcjId = commands.getCreatePCJ().get().createPCJ(ryaInstance, sparql.get(), strategies); + final String pcjId = commands.getCreatePCJ().createPCJ(ryaInstance, sparql.get(), strategies); // Return a message that indicates the ID of the newly created ID. return String.format("The PCJ has been created. Its ID is '%s'.", pcjId); } else { @@ -425,7 +452,7 @@ public class RyaAdminCommands implements CommandMarker { try { // Execute the command. - commands.getDeletePCJ().get().deletePCJ(ryaInstance, pcjId); + commands.getDeletePCJ().deletePCJ(ryaInstance, pcjId); return "The PCJ has been deleted."; } catch (final InstanceDoesNotExistException e) { http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java b/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java index e92eadd..31480db 100644 --- a/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java +++ b/extras/shell/src/main/java/org/apache/rya/shell/util/InstallPrompt.java @@ -108,14 +108,14 @@ public interface InstallPrompt { checkState(storageType.isPresent(), "The shell must be connected to a storage to use the install prompt."); switch(sharedShellState.getShellState().getStorageType().get()) { - case ACCUMULO: - return promptAccumuloVerified(instanceName, installConfig); + case ACCUMULO: + return promptAccumuloVerified(instanceName, installConfig); - case MONGO: - return promptMongoVerified(instanceName, installConfig); + case MONGO: + return promptMongoVerified(instanceName, installConfig); - default: - throw new IllegalStateException("Unsupported storage type: " + storageType.get()); + default: + throw new IllegalStateException("Unsupported storage type: " + storageType.get()); } } @@ -143,9 +143,9 @@ public interface InstallPrompt { final boolean enableFreeTextIndexing = promptBoolean(prompt, Optional.of(true)); builder.setEnableFreeTextIndex( enableFreeTextIndexing ); -// RYA-215 prompt = makeFieldPrompt("Use Geospatial Indexing", true); -// final boolean enableGeoIndexing = promptBoolean(prompt, Optional.of(true)); -// builder.setEnableGeoIndex( enableGeoIndexing ); + // RYA-215 prompt = makeFieldPrompt("Use Geospatial Indexing", true); + // final boolean enableGeoIndexing = promptBoolean(prompt, Optional.of(true)); + // builder.setEnableGeoIndex( enableGeoIndexing ); prompt = makeFieldPrompt("Use Temporal Indexing", true); final boolean useTemporalIndexing = promptBoolean(prompt, Optional.of(true)); @@ -188,7 +188,7 @@ public interface InstallPrompt { reader.println(" Use Shard Balancing: " + installConfig.isTableHashPrefixEnabled()); reader.println(" Use Entity Centric Indexing: " + installConfig.isEntityCentrixIndexEnabled()); reader.println(" Use Free Text Indexing: " + installConfig.isFreeTextIndexEnabled()); -// RYA-215 reader.println(" Use Geospatial Indexing: " + installConfig.isGeoIndexEnabled()); + // RYA-215 reader.println(" Use Geospatial Indexing: " + installConfig.isGeoIndexEnabled()); reader.println(" Use Temporal Indexing: " + installConfig.isTemporalIndexEnabled()); reader.println(" Use Precomputed Join Indexing: " + installConfig.isPcjIndexEnabled()); if(installConfig.isPcjIndexEnabled()) { @@ -224,6 +224,10 @@ public interface InstallPrompt { final boolean useTemporalIndexing = promptBoolean(prompt, Optional.of(true)); builder.setEnableTemporalIndex( useTemporalIndexing ); + prompt = makeFieldPrompt("Use PCJ Indexing", true); + final boolean usePcjIndexing = promptBoolean(prompt, Optional.of(true)); + builder.setEnablePcjIndex(usePcjIndexing); + return builder.build(); } @@ -246,6 +250,7 @@ public interface InstallPrompt { reader.println(" Instance Name: " + instanceName); reader.println(" Use Free Text Indexing: " + installConfig.isFreeTextIndexEnabled()); reader.println(" Use Temporal Indexing: " + installConfig.isTemporalIndexEnabled()); + reader.println(" Use PCJ Indexing: " + installConfig.isPcjIndexEnabled()); reader.println(""); return promptBoolean("Continue with the install? (y/n) ", Optional.absent()); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java b/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java index 3eacb54..2af1507 100644 --- a/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java +++ b/extras/shell/src/main/java/org/apache/rya/shell/util/RyaDetailsFormatter.java @@ -45,7 +45,7 @@ public class RyaDetailsFormatter { * @param details - The object to format. (not null) * @return A pretty render of the object. */ - public String format(StorageType storageType, final RyaDetails details) { + public String format(final StorageType storageType, final RyaDetails details) { requireNonNull(details); final StringBuilder report = new StringBuilder(); @@ -72,41 +72,43 @@ public class RyaDetailsFormatter { report.append(" Temporal Index:\n"); report.append(" Enabled: ").append( details.getTemporalIndexDetails().isEnabled() ).append("\n"); - if(storageType == StorageType.ACCUMULO) { - report.append(" PCJ Index:\n"); - final PCJIndexDetails pcjDetails = details.getPCJIndexDetails(); - report.append(" Enabled: ").append( pcjDetails.isEnabled() ).append("\n"); - if(pcjDetails.isEnabled()) { - if(pcjDetails.getFluoDetails().isPresent()) { - final String fluoAppName = pcjDetails.getFluoDetails().get().getUpdateAppName(); - report.append(" Fluo App Name: ").append(fluoAppName).append("\n"); - } + report.append(" PCJ Index:\n"); + final PCJIndexDetails pcjDetails = details.getPCJIndexDetails(); + report.append(" Enabled: ").append( pcjDetails.isEnabled() ).append("\n"); + if(pcjDetails.isEnabled()) { + if(pcjDetails.getFluoDetails().isPresent()) { + final String fluoAppName = pcjDetails.getFluoDetails().get().getUpdateAppName(); + report.append(" Fluo App Name: ").append(fluoAppName).append("\n"); + } - final ImmutableMap<String, PCJDetails> pcjs = pcjDetails.getPCJDetails(); - report.append(" PCJs:\n"); - if(pcjs.isEmpty()) { - report.append(" No PCJs have been added yet.\n"); - } else { - for(final PCJDetails pcj : pcjs.values()) { - report.append(" ID: ").append(pcj.getId()).append("\n"); + final ImmutableMap<String, PCJDetails> pcjs = pcjDetails.getPCJDetails(); + report.append(" PCJs:\n"); + if(pcjs.isEmpty()) { + report.append(" No PCJs have been added yet.\n"); + } else { + for(final PCJDetails pcj : pcjs.values()) { + report.append(" ID: ").append(pcj.getId()).append("\n"); - final String updateStrategy = format( pcj.getUpdateStrategy(), "None" ); - report.append(" Update Strategy: ").append(updateStrategy).append("\n"); + final String updateStrategy = format( pcj.getUpdateStrategy(), "None" ); + report.append(" Update Strategy: ").append(updateStrategy).append("\n"); - final String lastUpdateTime = format( pcj.getLastUpdateTime(), "unavailable"); - report.append(" Last Update Time: ").append(lastUpdateTime).append("\n"); - } + final String lastUpdateTime = format( pcj.getLastUpdateTime(), "unavailable"); + report.append(" Last Update Time: ").append(lastUpdateTime).append("\n"); } } - report.append("Statistics:\n"); - report.append(" Prospector:\n"); - final String prospectorLastUpdateTime = format(details.getProspectorDetails().getLastUpdated(), "unavailable"); - report.append(" Last Update Time: ").append( prospectorLastUpdateTime).append("\n"); - - report.append(" Join Selectivity:\n"); - final String jsLastUpdateTime = format(details.getJoinSelectivityDetails().getLastUpdated(), "unavailable"); - report.append(" Last Updated Time: ").append( jsLastUpdateTime ).append("\n"); + if (storageType == StorageType.ACCUMULO) { + report.append("Statistics:\n"); + report.append(" Prospector:\n"); + final String prospectorLastUpdateTime = format(details.getProspectorDetails().getLastUpdated(), + "unavailable"); + report.append(" Last Update Time: ").append(prospectorLastUpdateTime).append("\n"); + + report.append(" Join Selectivity:\n"); + final String jsLastUpdateTime = format(details.getJoinSelectivityDetails().getLastUpdated(), + "unavailable"); + report.append(" Last Updated Time: ").append(jsLastUpdateTime).append("\n"); + } } return report.toString(); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/c826ffea/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java ---------------------------------------------------------------------- diff --git a/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java b/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java index d214afa..79fe95d 100644 --- a/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java +++ b/extras/shell/src/test/java/org/apache/rya/shell/MongoRyaShellIT.java @@ -56,19 +56,6 @@ public class MongoRyaShellIT extends RyaShellMongoITBase { // Ensure the connection was successful. assertTrue(connectResult.isSuccess()); } - - @Test - public void connectMongo_noConnection() throws IOException { - final JLineShellComponent shell = getTestShell(); - // Attempt to connect to a mongo instance. The bad hostname should make this fail. - final String cmd = - RyaConnectionCommands.CONNECT_MONGO_CMD + " " + - "--hostname badhostname " + - "--port " + super.conf.getMongoPort(); - - final CommandResult rez = shell.executeCommand(cmd); - assertEquals(RuntimeException.class, rez.getException().getClass()); - } @Test public void printConnectionDetails_notConnected() {