http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/MiniAccumuloClusterInstance.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/MiniAccumuloClusterInstance.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/MiniAccumuloClusterInstance.java new file mode 100644 index 0000000..eb11ed8 --- /dev/null +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/MiniAccumuloClusterInstance.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.accumulo; + +import java.io.File; +import java.io.IOException; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.accumulo.minicluster.MiniAccumuloConfig; +import org.apache.log4j.Logger; + +import com.google.common.io.Files; + +/** + * Contains boilerplate code that can be used by an integration test that + * uses a {@link MiniAccumuloCluster}. + * <p> + * You can just extend {@link AccumuloITBase} if your test only requires Accumulo. + */ +public class MiniAccumuloClusterInstance { + + private static final Logger log = Logger.getLogger(MiniAccumuloClusterInstance.class); + + private static final String USERNAME = "root"; + private static final String PASSWORD = "password"; + + /** + * A mini Accumulo cluster that can be used by the tests. + */ + private static MiniAccumuloCluster cluster = null; + + /** + * Start the {@link MiniAccumuloCluster}. + */ + public void startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { + final File miniDataDir = Files.createTempDir(); + + // Setup and start the Mini Accumulo. + final MiniAccumuloConfig cfg = new MiniAccumuloConfig(miniDataDir, PASSWORD); + cluster = new MiniAccumuloCluster(cfg); + cluster.start(); + } + + /** + * Stop the {@link MiniAccumuloCluster}. + */ + public void stopMiniAccumulo() throws IOException, InterruptedException { + if(cluster != null) { + try { + log.info("Shutting down the Mini Accumulo being used as a Rya store."); + cluster.stop(); + log.info("Mini Accumulo being used as a Rya store shut down."); + } catch(final Exception e) { + log.error("Could not shut down the Mini Accumulo.", e); + } + } + } + + /** + * @return The {@link MiniAccumuloCluster} managed by this class. + */ + public MiniAccumuloCluster getCluster() { + return cluster; + } + + /** + * @return An accumulo connector that is connected to the mini cluster. + */ + public Connector getConnector() throws AccumuloException, AccumuloSecurityException { + return cluster.getConnector(USERNAME, PASSWORD); + } + + /** + * @return The root username. + */ + public String getUsername() { + return USERNAME; + } + + /** + * @return The root password. + */ + public String getPassword() { + return PASSWORD; + } + + /** + * @return The MiniAccumulo's zookeeper instance name. + */ + public String getInstanceName() { + return cluster.getInstanceName(); + } + + /** + * @return The MiniAccumulo's zookeepers. + */ + public String getZookeepers() { + return cluster.getZooKeepers(); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/instance/AccumuloRyaDetailsRepositoryIT.java ---------------------------------------------------------------------- diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/instance/AccumuloRyaDetailsRepositoryIT.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/instance/AccumuloRyaDetailsRepositoryIT.java index 951b13d..bdd6059 100644 --- a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/instance/AccumuloRyaDetailsRepositoryIT.java +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/instance/AccumuloRyaDetailsRepositoryIT.java @@ -1,5 +1,3 @@ -package mvm.rya.accumulo.instance; - /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,6 +16,7 @@ package mvm.rya.accumulo.instance; * specific language governing permissions and limitations * under the License. */ +package mvm.rya.accumulo.instance; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -46,6 +45,8 @@ import org.junit.Test; import com.google.common.base.Optional; +import mvm.rya.accumulo.AccumuloITBase; +import mvm.rya.accumulo.MiniAccumuloClusterInstance; import mvm.rya.api.instance.RyaDetails; import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; @@ -66,27 +67,7 @@ import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; /** * Tests the methods of {@link AccumuloRyaDetailsRepository} by using a {@link MiniAccumuloCluster}. */ -public class AccumuloRyaDetailsRepositoryIT { - - private static MiniAccumuloCluster cluster = null; - - @BeforeClass - public static void killLoudLogs() { - Logger.getLogger(ClientCnxn.class).setLevel(Level.ERROR); - } - - @Before - public void startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException { - // Setup the mini cluster. - final File tempDirectory = Files.createTempDirectory("testDir").toFile(); - cluster = new MiniAccumuloCluster(tempDirectory, "password"); - cluster.start(); - } - - @After - public void stopMiniAccumulo() throws IOException, InterruptedException { - cluster.stop(); - } +public class AccumuloRyaDetailsRepositoryIT extends AccumuloITBase { @Test public void initializeAndGet() throws AccumuloException, AccumuloSecurityException, AlreadyInitializedException, RyaDetailsRepositoryException { @@ -108,21 +89,16 @@ public class AccumuloRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); - // Setup the repository that will be tested using a mock instance of Accumulo. - final Instance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()); - final Connector connector = instance.getConnector("root", new PasswordToken("password")); + // Setup the repository that will be tested using a mini instance of Accumulo. + final Connector connector = getClusterInstance().getConnector(); final RyaDetailsRepository repo = new AccumuloRyaInstanceDetailsRepository(connector, instanceName); // Initialize the repository @@ -155,21 +131,16 @@ public class AccumuloRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); - // Setup the repository that will be tested using a mock instance of Accumulo. - final Instance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()); - final Connector connector = instance.getConnector("root", new PasswordToken("password")); + // Setup the repository that will be tested using a mini instance of Accumulo. + final Connector connector = getClusterInstance().getConnector(); final RyaDetailsRepository repo = new AccumuloRyaInstanceDetailsRepository(connector, instanceName); // Initialize the repository @@ -181,9 +152,8 @@ public class AccumuloRyaDetailsRepositoryIT { @Test(expected = NotInitializedException.class) public void getRyaInstance_notInitialized() throws AccumuloException, AccumuloSecurityException, NotInitializedException, RyaDetailsRepositoryException { - // Setup the repository that will be tested using a mock instance of Accumulo. - final Instance instance = new MockInstance(); - final Connector connector = instance.getConnector("username", new PasswordToken("password")); + // Setup the repository that will be tested using a mini instance of Accumulo. + final Connector connector = getClusterInstance().getConnector(); final RyaDetailsRepository repo = new AccumuloRyaInstanceDetailsRepository(connector, "testInstance"); // Try to fetch the details from the uninitialized repository. @@ -210,21 +180,17 @@ public class AccumuloRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); - // Setup the repository that will be tested using a mock instance of Accumulo. - final Instance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()); - final Connector connector = instance.getConnector("root", new PasswordToken("password")); + // Setup the repository that will be tested using a mini instance of Accumulo. + final MiniAccumuloClusterInstance clusterInstance = getClusterInstance(); + final Connector connector = clusterInstance.getConnector(); final RyaDetailsRepository repo = new AccumuloRyaInstanceDetailsRepository(connector, "testInstance"); // Initialize the repository @@ -237,8 +203,7 @@ public class AccumuloRyaDetailsRepositoryIT { @Test public void isInitialized_false() throws AccumuloException, AccumuloSecurityException, RyaDetailsRepositoryException { // Setup the repository that will be tested using a mock instance of Accumulo. - final Instance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()); - final Connector connector = instance.getConnector("root", new PasswordToken("password")); + final Connector connector = getClusterInstance().getConnector(); final RyaDetailsRepository repo = new AccumuloRyaInstanceDetailsRepository(connector, "testInstance"); // Ensure the repository reports that is has not been initialized. @@ -265,21 +230,16 @@ public class AccumuloRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); - // Setup the repository that will be tested using a mock instance of Accumulo. - final Instance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()); - final Connector connector = instance.getConnector("root", new PasswordToken("password")); + // Setup the repository that will be tested using a mini instance of Accumulo. + final Connector connector = getClusterInstance().getConnector(); final RyaDetailsRepository repo = new AccumuloRyaInstanceDetailsRepository(connector, "testInstance"); // Initialize the repository @@ -318,21 +278,16 @@ public class AccumuloRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); - // Setup the repository that will be tested using a mock instance of Accumulo. - final Instance instance = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()); - final Connector connector = instance.getConnector("root", new PasswordToken("password")); + // Setup the repository that will be tested using a mini instance of Accumulo. + final Connector connector = getClusterInstance().getConnector(); final RyaDetailsRepository repo = new AccumuloRyaInstanceDetailsRepository(connector, "testInstance"); // Initialize the repository http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java index 3a6e61d..282ecbb 100644 --- a/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java +++ b/dao/mongodb.rya/src/main/java/mvm/rya/mongodb/instance/MongoDetailsAdapter.java @@ -1,6 +1,4 @@ -package mvm.rya.mongodb.instance; - -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -9,7 +7,7 @@ package mvm.rya.mongodb.instance; * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -18,6 +16,9 @@ package mvm.rya.mongodb.instance; * specific language governing permissions and limitations * under the License. */ +package mvm.rya.mongodb.instance; + +import static java.util.Objects.requireNonNull; import java.util.ArrayList; import java.util.Date; @@ -104,7 +105,7 @@ public class MongoDetailsAdapter { .add(VERSION_KEY, details.getRyaVersion()) .add(ENTITY_DETAILS_KEY, details.getEntityCentricIndexDetails().isEnabled()) .add(GEO_DETAILS_KEY, details.getGeoIndexDetails().isEnabled()) - .add(PCJ_DETAILS_KEY, getPCJDetailsDBObject(details)) + .add(PCJ_DETAILS_KEY, toDBObject(details.getPCJIndexDetails())) .add(TEMPORAL_DETAILS_KEY, details.getTemporalIndexDetails().isEnabled()) .add(FREETEXT_DETAILS_KEY, details.getFreeTextIndexDetails().isEnabled()); if(details.getProspectorDetails().getLastUpdated().isPresent()) { @@ -116,26 +117,48 @@ public class MongoDetailsAdapter { return (BasicDBObject) builder.get(); } - private static DBObject getPCJDetailsDBObject(final RyaDetails details) { - final PCJIndexDetails pcjDetails = details.getPCJIndexDetails(); - final BasicDBObjectBuilder pcjBuilder = BasicDBObjectBuilder.start() - .add(PCJ_ENABLED_KEY, pcjDetails.isEnabled()); - if(pcjDetails.getFluoDetails().isPresent()) { - pcjBuilder.add(PCJ_FLUO_KEY, pcjDetails.getFluoDetails().get().getUpdateAppName()); + private static DBObject toDBObject(final PCJIndexDetails pcjIndexDetails) { + requireNonNull(pcjIndexDetails); + + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start(); + + // Is Enabled + builder.add(PCJ_ENABLED_KEY, pcjIndexDetails.isEnabled()); + + // Fluo Details if present. + if(pcjIndexDetails.getFluoDetails().isPresent()) { + builder.add(PCJ_FLUO_KEY, pcjIndexDetails.getFluoDetails().get().getUpdateAppName()); } - final List<DBObject> pcjDetailList = new ArrayList<>(); - for(final Entry<String, PCJDetails> entry : pcjDetails.getPCJDetails().entrySet()) { - final PCJDetails pcjDetail = entry.getValue(); - final BasicDBObjectBuilder indBuilbder = BasicDBObjectBuilder.start() - .add(PCJ_ID_KEY, pcjDetail.getId()) - .add(PCJ_UPDATE_STRAT_KEY, pcjDetail.getUpdateStrategy().name()); - if(pcjDetail.getLastUpdateTime().isPresent()) { - indBuilbder.add(PCJ_LAST_UPDATE_KEY, pcjDetail.getLastUpdateTime().get()); - } - pcjDetailList.add(indBuilbder.get()); + + // Add the PCJDetail objects. + final List<DBObject> pcjDetailsList = new ArrayList<>(); + for(final PCJDetails pcjDetails : pcjIndexDetails.getPCJDetails().values()) { + pcjDetailsList.add( toDBObject( pcjDetails ) ); + } + builder.add(PCJ_PCJS_KEY, pcjDetailsList.toArray()); + + return builder.get(); + } + + static DBObject toDBObject(final PCJDetails pcjDetails) { + requireNonNull(pcjDetails); + + final BasicDBObjectBuilder builder = BasicDBObjectBuilder.start(); + + // PCJ ID + builder.add(PCJ_ID_KEY, pcjDetails.getId()); + + // PCJ Update Strategy if present. + if(pcjDetails.getUpdateStrategy().isPresent()) { + builder.add(PCJ_UPDATE_STRAT_KEY, pcjDetails.getUpdateStrategy().get().name()); + } + + // Last Update Time if present. + if(pcjDetails.getLastUpdateTime().isPresent()) { + builder.add(PCJ_LAST_UPDATE_KEY, pcjDetails.getLastUpdateTime().get()); } - pcjBuilder.add(PCJ_PCJS_KEY, pcjDetailList.toArray()); - return pcjBuilder.get(); + + return builder.get(); } public static RyaDetails toRyaDetails(final DBObject mongoObj) throws MalformedRyaDetailsException { @@ -146,7 +169,7 @@ public class MongoDetailsAdapter { .setRyaVersion(basicObj.getString(VERSION_KEY)) .setEntityCentricIndexDetails(new EntityCentricIndexDetails(basicObj.getBoolean(ENTITY_DETAILS_KEY))) .setGeoIndexDetails(new GeoIndexDetails(basicObj.getBoolean(GEO_DETAILS_KEY))) - .setPCJIndexDetails(getPCJDetails(basicObj)) + .setPCJIndexDetails(getPCJIndexDetails(basicObj)) .setTemporalIndexDetails(new TemporalIndexDetails(basicObj.getBoolean(TEMPORAL_DETAILS_KEY))) .setFreeTextDetails(new FreeTextIndexDetails(basicObj.getBoolean(FREETEXT_DETAILS_KEY))) .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(basicObj.getDate(PROSPECTOR_DETAILS_KEY)))) @@ -157,24 +180,41 @@ public class MongoDetailsAdapter { } } - private static PCJIndexDetails getPCJDetails(final BasicDBObject basicObj) { - final BasicDBObject pcjObj = (BasicDBObject) basicObj.get(PCJ_DETAILS_KEY); + private static PCJIndexDetails.Builder getPCJIndexDetails(final BasicDBObject basicObj) { + final BasicDBObject pcjIndexDBO = (BasicDBObject) basicObj.get(PCJ_DETAILS_KEY); + final PCJIndexDetails.Builder pcjBuilder = PCJIndexDetails.builder() - .setEnabled(pcjObj.getBoolean(PCJ_ENABLED_KEY)) - .setFluoDetails(new FluoDetails(pcjObj.getString(PCJ_FLUO_KEY))); - final BasicDBList pcjs = (BasicDBList) pcjObj.get(PCJ_PCJS_KEY); + .setEnabled(pcjIndexDBO.getBoolean(PCJ_ENABLED_KEY)) + .setFluoDetails(new FluoDetails(pcjIndexDBO.getString(PCJ_FLUO_KEY))); + + final BasicDBList pcjs = (BasicDBList) pcjIndexDBO.get(PCJ_PCJS_KEY); if(pcjs != null) { for(int ii = 0; ii < pcjs.size(); ii++) { final BasicDBObject pcj = (BasicDBObject) pcjs.get(ii); - pcjBuilder.addPCJDetails( - PCJDetails.builder() - .setId(pcj.getString(PCJ_ID_KEY)) - .setUpdateStrategy(PCJUpdateStrategy.valueOf((String)pcj.get(PCJ_UPDATE_STRAT_KEY))) - .setLastUpdateTime(pcj.getDate(PCJ_LAST_UPDATE_KEY)) - .build()); + pcjBuilder.addPCJDetails( toPCJDetails(pcj) ); } } - return pcjBuilder.build(); + return pcjBuilder; + } + + static PCJDetails.Builder toPCJDetails(final BasicDBObject dbo) { + requireNonNull(dbo); + + // PCJ ID. + final PCJDetails.Builder builder = PCJDetails.builder() + .setId( dbo.getString(PCJ_ID_KEY) ); + + // PCJ Update Strategy if present. + if(dbo.containsField(PCJ_UPDATE_STRAT_KEY)) { + builder.setUpdateStrategy( PCJUpdateStrategy.valueOf( dbo.getString(PCJ_UPDATE_STRAT_KEY) ) ); + } + + // Last Update Time if present. + if(dbo.containsField(PCJ_LAST_UPDATE_KEY)) { + builder.setLastUpdateTime( dbo.getDate(PCJ_LAST_UPDATE_KEY) ); + } + + return builder; } /** http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoDetailsAdapterTest.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoDetailsAdapterTest.java b/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoDetailsAdapterTest.java index efcd553..9faa595 100644 --- a/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoDetailsAdapterTest.java +++ b/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoDetailsAdapterTest.java @@ -43,33 +43,42 @@ import mvm.rya.api.instance.RyaDetails.ProspectorDetails; import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; import mvm.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException; +/** + * Tests the methods of {@link MongoDetailsAdapter}. + */ public class MongoDetailsAdapterTest { + @Test public void ryaDetailsToMongoTest() { - final PCJIndexDetails.Builder pcjBuilder = PCJIndexDetails.builder() - .setEnabled(true) - .setFluoDetails(new FluoDetails("fluo")); - for(int ii = 0; ii < 2; ii++) { - pcjBuilder.addPCJDetails( - PCJDetails.builder() - .setId("pcj_"+ii) - .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime(new Date(ii)) - .build()); - } - + // Convert the Details into a Mongo DB OBject. final RyaDetails details = RyaDetails.builder() - .setRyaInstanceName("test") - .setRyaVersion("1") - .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) - .setGeoIndexDetails(new GeoIndexDetails(true)) - .setPCJIndexDetails(pcjBuilder.build()) - .setTemporalIndexDetails(new TemporalIndexDetails(true)) - .setFreeTextDetails(new FreeTextIndexDetails(true)) - .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L)))) - .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L)))) - .build(); + .setRyaInstanceName("test") + .setRyaVersion("1") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) + .setGeoIndexDetails(new GeoIndexDetails(true)) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails(new FluoDetails("fluo")) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_0") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime(new Date(0L))) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_1") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime(new Date(1L)))) + .setTemporalIndexDetails(new TemporalIndexDetails(true)) + .setFreeTextDetails(new FreeTextIndexDetails(true)) + .setProspectorDetails(new ProspectorDetails(Optional.fromNullable(new Date(0L)))) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.fromNullable(new Date(1L)))) + .build(); + + final BasicDBObject actual = MongoDetailsAdapter.toDBObject(details); + // Ensure it matches the expected object. final DBObject expected = (DBObject) JSON.parse( "{ " + "instanceName : \"test\"," @@ -97,15 +106,13 @@ public class MongoDetailsAdapterTest { + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}" + "}" ); - final BasicDBObject actual = MongoDetailsAdapter.toDBObject(details); - System.out.println(expected.toString()); - System.out.println("***"); - System.out.println(actual.toString()); + assertEquals(expected.toString(), actual.toString()); } @Test public void mongoToRyaDetailsTest() throws MalformedRyaDetailsException { + // Convert the Mongo object into a RyaDetails. final BasicDBObject mongo = (BasicDBObject) JSON.parse( "{ " + "instanceName : \"test\"," @@ -133,36 +140,41 @@ public class MongoDetailsAdapterTest { + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}" + "}" ); - final PCJIndexDetails.Builder pcjBuilder = PCJIndexDetails.builder() - .setEnabled(true) - .setFluoDetails(new FluoDetails("fluo")); - for(int ii = 0; ii < 2; ii++) { - pcjBuilder.addPCJDetails( - PCJDetails.builder() - .setId("pcj_"+ii) - .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime(new Date(ii)) - .build()); - } + final RyaDetails actual = MongoDetailsAdapter.toRyaDetails(mongo); + + // Ensure it matches the expected object. final RyaDetails expected = RyaDetails.builder() - .setRyaInstanceName("test") - .setRyaVersion("1") - .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) - .setGeoIndexDetails(new GeoIndexDetails(true)) - .setPCJIndexDetails(pcjBuilder.build()) - .setTemporalIndexDetails(new TemporalIndexDetails(true)) - .setFreeTextDetails(new FreeTextIndexDetails(true)) - .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L)))) - .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L)))) - .build(); + .setRyaInstanceName("test") + .setRyaVersion("1") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) + .setGeoIndexDetails(new GeoIndexDetails(true)) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails(new FluoDetails("fluo")) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_0") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime(new Date(0L))) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_1") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime(new Date(1L)))) + .setTemporalIndexDetails(new TemporalIndexDetails(true)) + .setFreeTextDetails(new FreeTextIndexDetails(true)) + .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L)))) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L)))) + .build(); - final RyaDetails actual = MongoDetailsAdapter.toRyaDetails(mongo); assertEquals(expected, actual); } @Test public void absentOptionalToRyaDetailsTest() throws MalformedRyaDetailsException { + // Convert the Mongo object into a RyaDetails. final BasicDBObject mongo = (BasicDBObject) JSON.parse( "{ " + "instanceName : \"test\"," @@ -175,7 +187,6 @@ public class MongoDetailsAdapterTest { + "pcjs : [ " + "{" + "id : \"pcj_1\"," - + "updateStrategy : \"INCREMENTAL\"" + "}" + "]" + "}," @@ -185,33 +196,52 @@ public class MongoDetailsAdapterTest { + "joinSelectivitiyDetails : null" + "}" ); - final PCJIndexDetails.Builder pcjBuilder = PCJIndexDetails.builder() - .setEnabled(false) - .setFluoDetails(new FluoDetails("fluo")) - .addPCJDetails( - PCJDetails.builder() - .setId("pcj_1") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .setLastUpdateTime(null).build()); + final RyaDetails actual = MongoDetailsAdapter.toRyaDetails(mongo); + // Ensure it matches the expected object. final RyaDetails expected = RyaDetails.builder() - .setRyaInstanceName("test") - .setRyaVersion("1") - .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) - .setGeoIndexDetails(new GeoIndexDetails(false)) - .setPCJIndexDetails(pcjBuilder.build()) - .setTemporalIndexDetails(new TemporalIndexDetails(false)) - .setFreeTextDetails(new FreeTextIndexDetails(true)) - .setProspectorDetails(new ProspectorDetails(Optional.<Date>absent())) - .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>absent())) - .build(); + .setRyaInstanceName("test") + .setRyaVersion("1") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) + .setGeoIndexDetails(new GeoIndexDetails(false)) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(false) + .setFluoDetails(new FluoDetails("fluo")) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_1") + .setLastUpdateTime(null))) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(true)) + .setProspectorDetails(new ProspectorDetails(Optional.<Date>absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>absent())) + .build(); - final RyaDetails actual = MongoDetailsAdapter.toRyaDetails(mongo); assertEquals(expected, actual); } @Test public void absentOptionalToMongoTest() { + // Convert the Details into a Mongo DB OBject. + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName("test") + .setRyaVersion("1") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) + .setGeoIndexDetails(new GeoIndexDetails(false)) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails(new FluoDetails("fluo"))) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(true)) + .setProspectorDetails(new ProspectorDetails(Optional.<Date>absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>absent())) + .build(); + + final DBObject actual = MongoDetailsAdapter.toDBObject(details); + + // Ensure it matches the expected object. final BasicDBObject expected = (BasicDBObject) JSON.parse( "{ " + "instanceName : \"test\"," @@ -227,23 +257,40 @@ public class MongoDetailsAdapterTest { + "freeTextDetails : true" + "}" ); - final PCJIndexDetails.Builder pcjBuilder = PCJIndexDetails.builder() - .setEnabled(true) - .setFluoDetails(new FluoDetails("fluo")); + assertEquals(expected, actual); + } - final RyaDetails details = RyaDetails.builder() - .setRyaInstanceName("test") - .setRyaVersion("1") - .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) - .setGeoIndexDetails(new GeoIndexDetails(false)) - .setPCJIndexDetails(pcjBuilder.build()) - .setTemporalIndexDetails(new TemporalIndexDetails(false)) - .setFreeTextDetails(new FreeTextIndexDetails(true)) - .setProspectorDetails(new ProspectorDetails(Optional.<Date>absent())) - .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>absent())) - .build(); + @Test + public void toDBObject_pcjDetails() { + final PCJDetails details = PCJDetails.builder() + .setId("pcjId") + .setLastUpdateTime( new Date() ) + .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL ) + .build(); - final DBObject actual = MongoDetailsAdapter.toDBObject(details); - assertEquals(expected, actual); + // Convert it into a Mongo DB Object. + final BasicDBObject dbo = (BasicDBObject) MongoDetailsAdapter.toDBObject(details); + + // Convert the dbo back into the original object. + final PCJDetails restored = MongoDetailsAdapter.toPCJDetails(dbo).build(); + + // Ensure the restored value matches the original. + assertEquals(details, restored); + } + + @Test + public void toDBObject_pcjDetails_missing_optionals() { + final PCJDetails details = PCJDetails.builder() + .setId("pcjId") + .build(); + + // Convert it into a Mongo DB Object. + final BasicDBObject dbo = (BasicDBObject) MongoDetailsAdapter.toDBObject(details); + + // Convert the dbo back into the original object. + final PCJDetails restored = MongoDetailsAdapter.toPCJDetails(dbo).build(); + + // Ensure the restored value matches the original. + assertEquals(details, restored); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java b/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java index 447eb98..2ce2e93 100644 --- a/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java +++ b/dao/mongodb.rya/src/test/java/mvm/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java @@ -32,7 +32,6 @@ import org.junit.BeforeClass; import org.junit.Test; import com.google.common.base.Optional; -import com.mongodb.DB; import com.mongodb.MongoClient; import com.mongodb.MongoException; @@ -60,7 +59,7 @@ import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; public class MongoRyaDetailsRepositoryIT { private static MongoClient client = null; - private static DB testDB = null; + @BeforeClass public static void startMiniAccumulo() throws MongoException, IOException { final MongodForTestsFactory mongoFactory = new MongodForTestsFactory(); @@ -70,7 +69,6 @@ public class MongoRyaDetailsRepositoryIT { @Before public void clearLastTest() { client.dropDatabase("testInstance"); - testDB = client.getDB("testInstance"); } @AfterClass @@ -98,14 +96,10 @@ public class MongoRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); @@ -143,14 +137,10 @@ public class MongoRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); @@ -194,14 +184,10 @@ public class MongoRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); @@ -245,14 +231,10 @@ public class MongoRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); @@ -296,14 +278,10 @@ public class MongoRyaDetailsRepositoryIT { PCJDetails.builder() .setId("pcj 1") .setUpdateStrategy(PCJUpdateStrategy.BATCH) - .setLastUpdateTime( new Date() ) - .build()) + .setLastUpdateTime( new Date() )) .addPCJDetails( PCJDetails.builder() - .setId("pcj 2") - .setUpdateStrategy(PCJUpdateStrategy.INCREMENTAL) - .build()) - .build()) + .setId("pcj 2"))) .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) .build(); @@ -316,12 +294,12 @@ public class MongoRyaDetailsRepositoryIT { // Create a new state for the details. final RyaDetails wrongOriginal = new RyaDetails.Builder( details ) - .setTemporalIndexDetails( new TemporalIndexDetails(false) ) - .build(); + .setTemporalIndexDetails( new TemporalIndexDetails(false) ) + .build(); final RyaDetails updated = new RyaDetails.Builder( details ) - .setGeoIndexDetails( new GeoIndexDetails(false) ) - .build(); + .setGeoIndexDetails( new GeoIndexDetails(false) ) + .build(); // Try to execute the update where the old state is not the currently stored state. repo.update(wrongOriginal, updated); http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/pom.xml ---------------------------------------------------------------------- diff --git a/extras/indexing/pom.xml b/extras/indexing/pom.xml index ad38855..bfa02f8 100644 --- a/extras/indexing/pom.xml +++ b/extras/indexing/pom.xml @@ -96,6 +96,17 @@ <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>io.fluo</groupId> + <artifactId>fluo-mini</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>accumulo.rya</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + </dependency> </dependencies> <build> <pluginManagement> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCommand.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCommand.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCommand.java new file mode 100644 index 0000000..078e985 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCommand.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; + +/** + * An abstract class that holds onto Accumulo access information. Extend this + * when implementing a command that interacts with Accumulo. + */ +@ParametersAreNonnullByDefault +public abstract class AccumuloCommand { + + private final AccumuloConnectionDetails connectionDetails; + private final Connector connector; + + /** + * Constructs an instance of {@link AccumuloCommand}. + * + * Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo + * that hosts Rya instance. (not null) + */ + public AccumuloCommand( + final AccumuloConnectionDetails connectionDetails, + final Connector connector) { + this.connectionDetails = requireNonNull( connectionDetails ); + this.connector = requireNonNull(connector); + } + + /** + * @return Details about the values that were used to create the connector to the cluster. (not null) + */ + public AccumuloConnectionDetails getAccumuloConnectionDetails() { + return connectionDetails; + } + + /** + * @return Provides programatic access to the instance of Accumulo that hosts Rya instance. + */ + public Connector getConnector() { + return connector; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloConnectionDetails.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloConnectionDetails.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloConnectionDetails.java new file mode 100644 index 0000000..c0a7be7 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloConnectionDetails.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; +import javax.annotation.concurrent.Immutable; + +/** + * The information that the shell used to connect to Accumulo. + */ +@Immutable +@ParametersAreNonnullByDefault +public class AccumuloConnectionDetails { + private final String username; + private final char[] password; + private final String instanceName; + private final String zookeepers; + + /** + * Constructs an instance of {@link AccumuloConnectionDetails}. + * + * @param username - The username that was used to establish the connection. (not null) + * @param password - The password that was used to establish the connection. (not null) + * @param instanceName - The Accumulo instance name that was used to establish the connection. (not null) + * @param zookeepers - The list of zookeeper hostname that were used to establish the connection. (not null) + */ + public AccumuloConnectionDetails( + final String username, + final char[] password, + final String instanceName, + final String zookeepers) { + this.username = requireNonNull(username); + this.password = requireNonNull(password); + this.instanceName = requireNonNull(instanceName); + this.zookeepers = requireNonNull(zookeepers); + } + + /** + * @return The username that was used to establish the connection. + */ + public String getUsername() { + return username; + } + + /** + * @return The password that was used to establish the connection. + */ + public char[] getPassword() { + return password; + } + + /** + * @return The Accumulo instance name that was used to establish the connection. + */ + public String getInstanceName() { + return instanceName; + } + + /** + * @return The list of zookeeper hostname that were used to establish the connection. + */ + public String getZookeepers() { + return zookeepers; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java new file mode 100644 index 0000000..92b5d8c --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloCreatePCJ.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.indexing.pcj.storage.PcjException; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.openrdf.query.MalformedQueryException; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.repository.RepositoryException; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +import io.fluo.api.client.FluoClient; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.client.CreatePCJ; +import mvm.rya.api.client.GetInstanceDetails; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import mvm.rya.api.instance.RyaDetailsUpdater; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator; +import mvm.rya.api.instance.RyaDetailsUpdater.RyaDetailsMutator.CouldNotApplyMutationException; +import mvm.rya.rdftriplestore.RdfCloudTripleStore; +import mvm.rya.rdftriplestore.RyaSailRepository; + +/** + * An Accumulo implementation of the {@link CreatePCJ} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloCreatePCJ extends AccumuloCommand implements CreatePCJ { + + private final GetInstanceDetails getInstanceDetails; + + /** + * Constructs an instance of {@link AccumuloCreatePCJ}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloCreatePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector); + } + + @Override + public String createPCJ(final String instanceName, final String sparql) throws InstanceDoesNotExistException, RyaClientException { + requireNonNull(instanceName); + requireNonNull(sparql); + + final Optional<RyaDetails> ryaDetailsHolder = getInstanceDetails.getDetails(instanceName); + final boolean ryaInstanceExists = ryaDetailsHolder.isPresent(); + if(!ryaInstanceExists) { + throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName)); + } + + final PCJIndexDetails pcjIndexDetails = ryaDetailsHolder.get().getPCJIndexDetails(); + final boolean pcjIndexingEnabeld = pcjIndexDetails.isEnabled(); + if(!pcjIndexingEnabeld) { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); + } + + final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); + final boolean usingFluo = fluoDetailsHolder.isPresent(); + if(!usingFluo) { + throw new RyaClientException( String.format("Can not create a PCJ for the '%s' instance of Rya because it does" + + "not have a Fluo application associated with it. Update the instance's PCJ Index Details to fix this problem.", instanceName) ); + } + + // Create the PCJ table that will receive the index results. + final String pcjId; + final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(getConnector(), instanceName); + try { + pcjId = pcjStorage.createPcj(sparql); + } catch (final PCJStorageException e) { + throw new RyaClientException("Problem while initializing the PCJ table.", e); + } + + // Task the Fluo application with updating the PCJ. + final String fluoAppName = fluoDetailsHolder.get().getUpdateAppName(); + try { + updateFluoApp(instanceName, fluoAppName, pcjStorage, pcjId); + } catch (RepositoryException | MalformedQueryException | SailException | QueryEvaluationException | PcjException e) { + throw new RyaClientException("Problem while initializing the Fluo application with the new PCJ.", e); + } + + // Update the Rya Details to indicate the PCJ is being updated incrementally. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); + try { + new RyaDetailsUpdater(detailsRepo).update(new RyaDetailsMutator() { + @Override + public RyaDetails mutate(final RyaDetails originalDetails) throws CouldNotApplyMutationException { + // Update the original PCJ Details to indicate they are incrementally updated. + final PCJDetails originalPCJDetails = originalDetails.getPCJIndexDetails().getPCJDetails().get(pcjId); + final PCJDetails.Builder mutatedPCJDetails = PCJDetails.builder( originalPCJDetails ) + .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL ); + + // Replace the old PCJ Details with the updated ones. + final RyaDetails.Builder builder = RyaDetails.builder(originalDetails); + builder.getPCJIndexDetails().addPCJDetails( mutatedPCJDetails ); + return builder.build(); + } + }); + } catch (RyaDetailsRepositoryException | CouldNotApplyMutationException e) { + throw new RyaClientException("Problem while updating the Rya instance's Details to indicate the PCJ is being incrementally updated.", e); + } + + // Return the ID that was assigned to the PCJ. + return pcjId; + } + + private void updateFluoApp(final String ryaInstance, final String fluoAppName, final PrecomputedJoinStorage pcjStorage, final String pcjId) throws RepositoryException, MalformedQueryException, SailException, QueryEvaluationException, PcjException { + requireNonNull(pcjStorage); + requireNonNull(pcjId); + + // Connect to the Fluo application that is updating this instance's PCJs. + final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); + final FluoClient fluoClient = new FluoClientFactory().connect( + cd.getUsername(), + new String(cd.getPassword()), + cd.getInstanceName(), + cd.getZookeepers(), + fluoAppName); + + // Setup the Rya client that is able to talk to scan Rya's statements. + final RyaSailRepository ryaSailRepo = makeRyaRepository(getConnector(), ryaInstance); + + // Initialize the PCJ within the Fluo application. + final org.apache.rya.indexing.pcj.fluo.api.CreatePcj fluoCreatePcj = new org.apache.rya.indexing.pcj.fluo.api.CreatePcj(); + fluoCreatePcj.withRyaIntegration(pcjId, pcjStorage, fluoClient, ryaSailRepo); + } + + private static RyaSailRepository makeRyaRepository(final Connector connector, final String ryaInstance) throws RepositoryException { + checkNotNull(connector); + checkNotNull(ryaInstance); + + // Setup Rya configuration values. + final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + ryaConf.setTablePrefix( ryaInstance ); + + // Connect to the Rya repo using the provided Connector. + final AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO(); + accumuloRyaDao.setConnector(connector); + accumuloRyaDao.setConf(ryaConf); + + final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore(); + ryaStore.setRyaDAO(accumuloRyaDao); + + final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore); + ryaRepo.initialize(); + return ryaRepo; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJ.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJ.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJ.java new file mode 100644 index 0000000..d170fab --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloDeletePCJ.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; +import org.apache.rya.indexing.pcj.fluo.api.DeletePcj; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage; +import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage.PCJStorageException; +import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Optional; + +import io.fluo.api.client.FluoClient; +import mvm.rya.api.client.DeletePCJ; +import mvm.rya.api.client.GetInstanceDetails; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; + +/** + * An Accumulo implementation of the {@link DeletePCJ} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloDeletePCJ extends AccumuloCommand implements DeletePCJ { + + private static final Logger log = LoggerFactory.getLogger(AccumuloDeletePCJ.class); + + private final GetInstanceDetails getInstanceDetails; + + /** + * Constructs an instance of {@link AccumuloDeletePCJ}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloDeletePCJ(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + getInstanceDetails = new AccumuloGetInstanceDetails(connectionDetails, connector); + } + + @Override + public void deletePCJ(final String instanceName, final String pcjId) throws InstanceDoesNotExistException, RyaClientException { + requireNonNull(instanceName); + requireNonNull(pcjId); + + final Optional<RyaDetails> originalDetails = getInstanceDetails.getDetails(instanceName); + final boolean ryaInstanceExists = originalDetails.isPresent(); + if(!ryaInstanceExists) { + throw new InstanceDoesNotExistException(String.format("The '%s' instance of Rya does not exist.", instanceName)); + } + + final boolean pcjIndexingEnabeld = originalDetails.get().getPCJIndexDetails().isEnabled(); + if(!pcjIndexingEnabeld) { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ Indexing enabled.", instanceName)); + } + + final boolean pcjExists = originalDetails.get().getPCJIndexDetails().getPCJDetails().containsKey( pcjId ); + if(!pcjExists) { + throw new RyaClientException(String.format("The '%s' instance of Rya does not have PCJ with ID '%s'.", instanceName, pcjId)); + } + + // If the PCJ was being maintained by a Fluo application, then stop that process. + final PCJIndexDetails pcjIndexDetails = originalDetails.get().getPCJIndexDetails(); + final PCJDetails droppedPcjDetails = pcjIndexDetails.getPCJDetails().get( pcjId ); + if(droppedPcjDetails.getUpdateStrategy().isPresent()) { + if(droppedPcjDetails.getUpdateStrategy().get() == PCJUpdateStrategy.INCREMENTAL) { + final Optional<FluoDetails> fluoDetailsHolder = pcjIndexDetails.getFluoDetails(); + + if(fluoDetailsHolder.isPresent()) { + final String fluoAppName = pcjIndexDetails.getFluoDetails().get().getUpdateAppName(); + stopUpdatingPCJ(fluoAppName, pcjId); + } else { + log.error(String.format("Could not stop the Fluo application from updating the PCJ because the Fluo Details are " + + "missing for the Rya instance named '%s'.", instanceName)); + } + } + } + + // Drop the table that holds the PCJ results from Accumulo. + final PrecomputedJoinStorage pcjs = new AccumuloPcjStorage(getConnector(), instanceName); + try { + pcjs.dropPcj(pcjId); + } catch (final PCJStorageException e) { + throw new RyaClientException("Could not drop the PCJ's table from Accumulo.", e); + } + } + + private void stopUpdatingPCJ(final String fluoAppName, final String pcjId) { + requireNonNull(fluoAppName); + requireNonNull(pcjId); + + // Connect to the Fluo application that is updating this instance's PCJs. + final AccumuloConnectionDetails cd = super.getAccumuloConnectionDetails(); + final FluoClient fluoClient = new FluoClientFactory().connect( + cd.getUsername(), + new String(cd.getPassword()), + cd.getInstanceName(), + cd.getZookeepers(), + fluoAppName); + + // Delete the PCJ from the Fluo App. + new DeletePcj(1000).deletePcj(fluoClient, pcjId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetails.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetails.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetails.java new file mode 100644 index 0000000..a2fed78 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloGetInstanceDetails.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.Connector; + +import com.google.common.base.Optional; + +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.client.GetInstanceDetails; +import mvm.rya.api.client.InstanceDoesNotExistException; +import mvm.rya.api.client.InstanceExists; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; + +/** + * An Accumulo implementation of the {@link GetInstanceDetails} command. + */ +@ParametersAreNonnullByDefault +public class AccumuloGetInstanceDetails extends AccumuloCommand implements GetInstanceDetails { + + private final InstanceExists instanceExists; + + /** + * Constructs an instance of {@link AccumuloGetInstanceDetails}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloGetInstanceDetails(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + instanceExists = new AccumuloInstanceExists(connectionDetails, connector); + } + + @Override + public Optional<RyaDetails> getDetails(final String instanceName) throws InstanceDoesNotExistException, RyaClientException { + requireNonNull(instanceName); + + // Ensure the Rya instance exists. + if(!instanceExists.exists(instanceName)) { + throw new InstanceDoesNotExistException(String.format("There is no Rya instance named '%s'.", instanceName)); + } + + // If the instance has details, then return them. + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); + try { + return Optional.of( detailsRepo.getRyaInstanceDetails() ); + } catch (final NotInitializedException e) { + return Optional.absent(); + } catch (final RyaDetailsRepositoryException e) { + throw new RyaClientException("Could not fetch the Rya instance's details.", e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/fcc98bd9/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstall.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstall.java b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstall.java new file mode 100644 index 0000000..841be01 --- /dev/null +++ b/extras/indexing/src/main/java/mvm/rya/api/client/accumulo/AccumuloInstall.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package mvm.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.util.Date; + +import javax.annotation.ParametersAreNonnullByDefault; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; + +import com.google.common.base.Optional; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository; +import mvm.rya.api.client.Install; +import mvm.rya.api.client.InstanceExists; +import mvm.rya.api.client.RyaClientException; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; +import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.ProspectorDetails; +import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.indexing.accumulo.ConfigUtils; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinStorageType; +import mvm.rya.indexing.external.PrecomputedJoinIndexerConfig.PrecomputedJoinUpdaterType; +import mvm.rya.rdftriplestore.inference.InferenceEngineException; +import mvm.rya.sail.config.RyaSailFactory; + +/** + * An Accumulo implementation of the {@link Install} command. + */ + +@ParametersAreNonnullByDefault +public class AccumuloInstall extends AccumuloCommand implements Install { + + private final InstanceExists instanceExists; + + /** + * Constructs an instance of {@link AccumuloInstall}. + * + * @param connectionDetails - Details about the values that were used to create the connector to the cluster. (not null) + * @param connector - Provides programatic access to the instance of Accumulo that hosts Rya instance. (not null) + */ + public AccumuloInstall(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + instanceExists = new AccumuloInstanceExists(connectionDetails, connector); + } + + @Override + public void install(final String instanceName, final InstallConfiguration installConfig) throws DuplicateInstanceNameException, RyaClientException { + requireNonNull(instanceName); + requireNonNull(installConfig); + + // Check to see if a Rya instance has already been installed with this name. + if(instanceExists.exists(instanceName)) { + throw new DuplicateInstanceNameException("An instance of Rya has already been installed to this Rya storage " + + "with the name '" + instanceName + "'. Try again with a different name."); + } + + // Initialize the Rya Details table. + RyaDetails details; + try { + details = initializeRyaDetails(instanceName, installConfig); + } catch (final AlreadyInitializedException e) { + // This can only happen if somebody else installs an instance of Rya with the name between the check and now. + throw new DuplicateInstanceNameException("An instance of Rya has already been installed to this Rya storage " + + "with the name '" + instanceName + "'. Try again with a different name."); + } catch (final RyaDetailsRepositoryException e) { + throw new RyaClientException("The RyaDetails couldn't be initialized. Details: " + e.getMessage(), e); + } + + // Initialize the rest of the tables used by the Rya instance. + final AccumuloRdfConfiguration ryaConfig = makeRyaConfig(getAccumuloConnectionDetails(), details); + try { + final Sail ryaSail = RyaSailFactory.getInstance(ryaConfig); + ryaSail.shutDown(); + } catch (final AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + throw new RyaClientException("Could not initialize all of the tables for the new Rya instance. " + + "This instance may be left in a bad state.", e); + } catch (final SailException e) { + throw new RyaClientException("Problem shutting down the Sail object used to install Rya.", e); + } + } + + /** + * @return The version of the application as reported by the manifest. + */ + private String getVersion() { + return "" + this.getClass().getPackage().getImplementationVersion(); + } + + /** + * Initializes the {@link RyaDetails} and stores them for the new instance. + * + * @param instanceName - The name of the instance that is being created. (not null) + * @param installConfig - The instance's install configuration. (not null) + * @return The {@link RyaDetails} that were stored. + * @throws AlreadyInitializedException Could not be initialized because + * a table with this instance name has already exists and is holding the details. + * @throws RyaDetailsRepositoryException Something caused the initialization + * operation to fail. + */ + private RyaDetails initializeRyaDetails(final String instanceName, final InstallConfiguration installConfig) + throws AlreadyInitializedException, RyaDetailsRepositoryException { + final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(getConnector(), instanceName); + + // Build the PCJ Index details. + final PCJIndexDetails.Builder pcjDetailsBuilder = PCJIndexDetails.builder() + .setEnabled(installConfig.isPcjIndexEnabled()); + if(installConfig.getFluoPcjAppName().isPresent()) { + final String fluoPcjAppName = installConfig.getFluoPcjAppName().get(); + pcjDetailsBuilder.setFluoDetails(new FluoDetails( fluoPcjAppName )); + } + + final RyaDetails details = RyaDetails.builder() + // General Metadata + .setRyaInstanceName(instanceName) + .setRyaVersion( getVersion() ) + + // Secondary Index Values + .setGeoIndexDetails( + new GeoIndexDetails(installConfig.isGeoIndexEnabled())) + .setTemporalIndexDetails( + new TemporalIndexDetails(installConfig.isTemporalIndexEnabled())) + .setFreeTextDetails( + new FreeTextIndexDetails(installConfig.isFreeTextIndexEnabled())) + .setEntityCentricIndexDetails( + new EntityCentricIndexDetails(installConfig.isEntityCentrixIndexEnabled())) + .setPCJIndexDetails( pcjDetailsBuilder ) + + // Statistics values. + .setProspectorDetails( + new ProspectorDetails(Optional.<Date>absent()) ) + .setJoinSelectivityDetails( + new JoinSelectivityDetails(Optional.<Date>absent()) ) + .build(); + + // Initialize the table. + detailsRepo.initialize(details); + + return details; + } + + /** + * Builds a {@link AccumuloRdfConfiguration} object that will be used by the + * Rya DAO to initialize all of the tables it will need. + * + * @param connectionDetails - Indicates how to connect to Accumulo. (not null) + * @param details - Indicates what needs to be installed. (not null) + * @return A Rya Configuration object that can be used to perform the install. + */ + private static AccumuloRdfConfiguration makeRyaConfig(final AccumuloConnectionDetails connectionDetails, final RyaDetails details) { + final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + + // The Rya Instance Name is used as a prefix for the index tables in Accumulo. + conf.setTablePrefix( details.getRyaInstanceName() ); + + // Enable the indexers that the instance is configured to use. + conf.set(ConfigUtils.USE_GEO, "" + details.getGeoIndexDetails().isEnabled() ); + conf.set(ConfigUtils.USE_FREETEXT, "" + details.getFreeTextIndexDetails().isEnabled() ); + conf.set(ConfigUtils.USE_TEMPORAL, "" + details.getTemporalIndexDetails().isEnabled() ); + conf.set(ConfigUtils.USE_ENTITY, "" + details.getEntityCentricIndexDetails().isEnabled()); + + conf.set(ConfigUtils.USE_PCJ, "" + details.getPCJIndexDetails().isEnabled() ); + conf.set(ConfigUtils.PCJ_STORAGE_TYPE, PrecomputedJoinStorageType.ACCUMULO.toString()); + + final Optional<FluoDetails> fluoHolder = details.getPCJIndexDetails().getFluoDetails(); + final PrecomputedJoinUpdaterType updaterType = fluoHolder.isPresent() ? PrecomputedJoinUpdaterType.FLUO : PrecomputedJoinUpdaterType.NO_UPDATE; + conf.set(ConfigUtils.PCJ_UPDATER_TYPE, updaterType.toString()); + + // XXX The Accumulo implementation of the secondary indices make need all + // of the accumulo connector's parameters to initialize themselves, so + // we need to include them here. It would be nice if the secondary + // indexers used the connector that is provided to them instead of + // building a new one. + conf.set(ConfigUtils.CLOUDBASE_USER, connectionDetails.getUsername()); + conf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connectionDetails.getPassword())); + conf.set(ConfigUtils.CLOUDBASE_INSTANCE, connectionDetails.getInstanceName()); + conf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connectionDetails.getZookeepers()); + + // This initializes the living indexers that will be used by the application and + // caches them within the configuration object so that they may be used later. + ConfigUtils.setIndexers(conf); + + return conf; + } +} \ No newline at end of file
