http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/SimpleMongoDBStorageStrategyTest.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/SimpleMongoDBStorageStrategyTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/SimpleMongoDBStorageStrategyTest.java new file mode 100644 index 0000000..be5fdb7 --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/SimpleMongoDBStorageStrategyTest.java @@ -0,0 +1,85 @@ +package mvm.rya.mongodb; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.junit.Assert.assertEquals; +import static org.openrdf.model.vocabulary.XMLSchema.ANYURI; + +import java.io.IOException; + +import org.junit.Test; + +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; +import com.mongodb.MongoException; + +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaStatement.RyaStatementBuilder; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; + +public class SimpleMongoDBStorageStrategyTest { + private static final String SUBJECT = "http://subject.com"; + private static final String PREDICATE = "http://temp.com"; + private static final String OBJECT = "http://object.com"; + private static final String CONTEXT = "http://context.com"; + + private static final RyaStatement testStatement; + private static final DBObject testDBO; + private final SimpleMongoDBStorageStrategy storageStrategy = new SimpleMongoDBStorageStrategy(); + + static { + final RyaStatementBuilder builder = new RyaStatementBuilder(); + builder.setPredicate(new RyaURI(PREDICATE)); + builder.setSubject(new RyaURI(SUBJECT)); + builder.setObject(new RyaURI(OBJECT)); + builder.setContext(new RyaURI(CONTEXT)); + builder.setTimestamp(null); + testStatement = builder.build(); + + testDBO = new BasicDBObject(); + testDBO.put("_id", "d5f8fea0e85300478da2c9b4e132c69502e21221"); + testDBO.put("subject", SUBJECT); + testDBO.put("predicate", PREDICATE); + testDBO.put("object", OBJECT); + testDBO.put("objectType", ANYURI.stringValue()); + testDBO.put("context", CONTEXT); + testDBO.put("insertTimestamp", null); + } + + @Test + public void testSerializeStatementToDBO() throws RyaDAOException, MongoException, IOException { + + final DBObject dbo = storageStrategy.serialize(testStatement); + assertEquals(testDBO, dbo); + } + + @Test + public void testDeSerializeStatementToDBO() throws RyaDAOException, MongoException, IOException { + final RyaStatement statement = storageStrategy.deserializeDBObject(testDBO); + /** + * Since RyaStatement creates a timestamp using JVM time if the timestamp is null, we want to re-null it + * for this test. Timestamp is created at insert time by the Server, this test + * can be found in the RyaDAO. + */ + statement.setTimestamp(null); + assertEquals(testStatement, statement); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java new file mode 100644 index 0000000..9faa595 --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoDetailsAdapterTest.java @@ -0,0 +1,296 @@ +package mvm.rya.mongodb.instance; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.junit.Assert.assertEquals; + +import java.util.Date; + +import org.junit.Test; + +import com.google.common.base.Optional; +import com.mongodb.BasicDBObject; +import com.mongodb.DBObject; +import com.mongodb.util.JSON; + +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; +import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import mvm.rya.api.instance.RyaDetails.ProspectorDetails; +import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; +import mvm.rya.mongodb.instance.MongoDetailsAdapter.MalformedRyaDetailsException; + +/** + * Tests the methods of {@link MongoDetailsAdapter}. + */ +public class MongoDetailsAdapterTest { + + @Test + public void ryaDetailsToMongoTest() { + // Convert the Details into a Mongo DB OBject. + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName("test") + .setRyaVersion("1") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) + .setGeoIndexDetails(new GeoIndexDetails(true)) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails(new FluoDetails("fluo")) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_0") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime(new Date(0L))) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_1") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime(new Date(1L)))) + .setTemporalIndexDetails(new TemporalIndexDetails(true)) + .setFreeTextDetails(new FreeTextIndexDetails(true)) + .setProspectorDetails(new ProspectorDetails(Optional.fromNullable(new Date(0L)))) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.fromNullable(new Date(1L)))) + .build(); + + final BasicDBObject actual = MongoDetailsAdapter.toDBObject(details); + + // Ensure it matches the expected object. + final DBObject expected = (DBObject) JSON.parse( + "{ " + + "instanceName : \"test\"," + + "version : \"1\"," + + "entityCentricDetails : true," + + "geoDetails : true," + + "pcjDetails : {" + + "enabled : true ," + + "fluoName : \"fluo\"," + + "pcjs : [ " + + "{" + + "id : \"pcj_0\"," + + "updateStrategy : \"BATCH\"," + + "lastUpdate : { $date : \"1970-01-01T00:00:00.000Z\"}" + + "}," + + "{" + + "id : \"pcj_1\"," + + "updateStrategy : \"BATCH\"," + + "lastUpdate : { $date : \"1970-01-01T00:00:00.001Z\"}" + + "}]" + + "}," + + "temporalDetails : true," + + "freeTextDetails : true," + + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"}," + + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}" + + "}" + ); + + assertEquals(expected.toString(), actual.toString()); + } + + @Test + public void mongoToRyaDetailsTest() throws MalformedRyaDetailsException { + // Convert the Mongo object into a RyaDetails. + final BasicDBObject mongo = (BasicDBObject) JSON.parse( + "{ " + + "instanceName : \"test\"," + + "version : \"1\"," + + "entityCentricDetails : true," + + "geoDetails : true," + + "pcjDetails : {" + + "enabled : true ," + + "fluoName : \"fluo\"," + + "pcjs : [ " + + "{" + + "id : \"pcj_0\"," + + "updateStrategy : \"BATCH\"," + + "lastUpdate : { $date : \"1970-01-01T00:00:00.000Z\"}" + + "}," + + "{" + + "id : \"pcj_1\"," + + "updateStrategy : \"BATCH\"," + + "lastUpdate : { $date : \"1970-01-01T00:00:00.001Z\"}" + + "}]" + + "}," + + "temporalDetails : true," + + "freeTextDetails : true," + + "prospectorDetails : { $date : \"1970-01-01T00:00:00.000Z\"}," + + "joinSelectivitiyDetails : { $date : \"1970-01-01T00:00:00.001Z\"}" + + "}" + ); + + final RyaDetails actual = MongoDetailsAdapter.toRyaDetails(mongo); + + // Ensure it matches the expected object. + final RyaDetails expected = RyaDetails.builder() + .setRyaInstanceName("test") + .setRyaVersion("1") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) + .setGeoIndexDetails(new GeoIndexDetails(true)) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails(new FluoDetails("fluo")) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_0") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime(new Date(0L))) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_1") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime(new Date(1L)))) + .setTemporalIndexDetails(new TemporalIndexDetails(true)) + .setFreeTextDetails(new FreeTextIndexDetails(true)) + .setProspectorDetails(new ProspectorDetails(Optional.<Date>fromNullable(new Date(0L)))) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>fromNullable(new Date(1L)))) + .build(); + + assertEquals(expected, actual); + } + + @Test + public void absentOptionalToRyaDetailsTest() throws MalformedRyaDetailsException { + // Convert the Mongo object into a RyaDetails. + final BasicDBObject mongo = (BasicDBObject) JSON.parse( + "{ " + + "instanceName : \"test\"," + + "version : \"1\"," + + "entityCentricDetails : true," + + "geoDetails : false," + + "pcjDetails : {" + + "enabled : false," + + "fluoName : \"fluo\"," + + "pcjs : [ " + + "{" + + "id : \"pcj_1\"," + + "}" + + "]" + + "}," + + "temporalDetails : false," + + "freeTextDetails : true," + + "prospectorDetails : null," + + "joinSelectivitiyDetails : null" + + "}" + ); + final RyaDetails actual = MongoDetailsAdapter.toRyaDetails(mongo); + + // Ensure it matches the expected object. + final RyaDetails expected = RyaDetails.builder() + .setRyaInstanceName("test") + .setRyaVersion("1") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) + .setGeoIndexDetails(new GeoIndexDetails(false)) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(false) + .setFluoDetails(new FluoDetails("fluo")) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj_1") + .setLastUpdateTime(null))) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(true)) + .setProspectorDetails(new ProspectorDetails(Optional.<Date>absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>absent())) + .build(); + + assertEquals(expected, actual); + } + + @Test + public void absentOptionalToMongoTest() { + // Convert the Details into a Mongo DB OBject. + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName("test") + .setRyaVersion("1") + .setEntityCentricIndexDetails(new EntityCentricIndexDetails(true)) + .setGeoIndexDetails(new GeoIndexDetails(false)) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails(new FluoDetails("fluo"))) + .setTemporalIndexDetails(new TemporalIndexDetails(false)) + .setFreeTextDetails(new FreeTextIndexDetails(true)) + .setProspectorDetails(new ProspectorDetails(Optional.<Date>absent())) + .setJoinSelectivityDetails(new JoinSelectivityDetails(Optional.<Date>absent())) + .build(); + + final DBObject actual = MongoDetailsAdapter.toDBObject(details); + + // Ensure it matches the expected object. + final BasicDBObject expected = (BasicDBObject) JSON.parse( + "{ " + + "instanceName : \"test\"," + + "version : \"1\"," + + "entityCentricDetails : true," + + "geoDetails : false," + + "pcjDetails : {" + + "enabled : true," + + "fluoName : \"fluo\"," + + "pcjs : [ ]" + + "}," + + "temporalDetails : false," + + "freeTextDetails : true" + + "}" + ); + assertEquals(expected, actual); + } + + @Test + public void toDBObject_pcjDetails() { + final PCJDetails details = PCJDetails.builder() + .setId("pcjId") + .setLastUpdateTime( new Date() ) + .setUpdateStrategy( PCJUpdateStrategy.INCREMENTAL ) + .build(); + + // Convert it into a Mongo DB Object. + final BasicDBObject dbo = (BasicDBObject) MongoDetailsAdapter.toDBObject(details); + + // Convert the dbo back into the original object. + final PCJDetails restored = MongoDetailsAdapter.toPCJDetails(dbo).build(); + + // Ensure the restored value matches the original. + assertEquals(details, restored); + } + + @Test + public void toDBObject_pcjDetails_missing_optionals() { + final PCJDetails details = PCJDetails.builder() + .setId("pcjId") + .build(); + + // Convert it into a Mongo DB Object. + final BasicDBObject dbo = (BasicDBObject) MongoDetailsAdapter.toDBObject(details); + + // Convert the dbo back into the original object. + final PCJDetails restored = MongoDetailsAdapter.toPCJDetails(dbo).build(); + + // Ensure the restored value matches the original. + assertEquals(details, restored); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java ---------------------------------------------------------------------- diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java new file mode 100644 index 0000000..2ce2e93 --- /dev/null +++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/instance/MongoRyaDetailsRepositoryIT.java @@ -0,0 +1,307 @@ +package mvm.rya.mongodb.instance; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Date; + +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.base.Optional; +import com.mongodb.MongoClient; +import com.mongodb.MongoException; + +import de.flapdoodle.embed.mongo.tests.MongodForTestsFactory; +import mvm.rya.api.instance.RyaDetails; +import mvm.rya.api.instance.RyaDetails.EntityCentricIndexDetails; +import mvm.rya.api.instance.RyaDetails.FreeTextIndexDetails; +import mvm.rya.api.instance.RyaDetails.GeoIndexDetails; +import mvm.rya.api.instance.RyaDetails.JoinSelectivityDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.FluoDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails; +import mvm.rya.api.instance.RyaDetails.PCJIndexDetails.PCJDetails.PCJUpdateStrategy; +import mvm.rya.api.instance.RyaDetails.ProspectorDetails; +import mvm.rya.api.instance.RyaDetails.TemporalIndexDetails; +import mvm.rya.api.instance.RyaDetailsRepository; +import mvm.rya.api.instance.RyaDetailsRepository.AlreadyInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.ConcurrentUpdateException; +import mvm.rya.api.instance.RyaDetailsRepository.NotInitializedException; +import mvm.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException; + +/** + * Tests the methods of {@link AccumuloRyaDetailsRepository} by using a {@link MiniAccumuloCluster}. + */ +public class MongoRyaDetailsRepositoryIT { + + private static MongoClient client = null; + + @BeforeClass + public static void startMiniAccumulo() throws MongoException, IOException { + final MongodForTestsFactory mongoFactory = new MongodForTestsFactory(); + client = mongoFactory.newMongo(); + } + + @Before + public void clearLastTest() { + client.dropDatabase("testInstance"); + } + + @AfterClass + public static void stopMiniAccumulo() throws IOException, InterruptedException { + client.close(); + } + + @Test + public void initializeAndGet() throws AlreadyInitializedException, RyaDetailsRepositoryException { + final String instanceName = "testInstance"; + + // Create the metadata object the repository will be initialized with. + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName(instanceName) + .setRyaVersion("1.2.3.4") + .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) + .setGeoIndexDetails( new GeoIndexDetails(true) ) + .setTemporalIndexDetails( new TemporalIndexDetails(true) ) + .setFreeTextDetails( new FreeTextIndexDetails(true) ) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") ) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 1") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime( new Date() )) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 2"))) + .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) + .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) + .build(); + + // Setup the repository that will be tested using a mock instance of MongoDB. + final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, instanceName); + + // Initialize the repository + repo.initialize(details); + + // Fetch the stored details. + final RyaDetails stored = repo.getRyaInstanceDetails(); + + // Ensure the fetched object is equivalent to what was stored. + assertEquals(details, stored); + } + + @Test(expected = AlreadyInitializedException.class) + public void initialize_alreadyInitialized() throws AlreadyInitializedException, RyaDetailsRepositoryException { + final String instanceName = "testInstance"; + + // Create the metadata object the repository will be initialized with. + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName(instanceName) + .setRyaVersion("1.2.3.4") + .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) + .setGeoIndexDetails( new GeoIndexDetails(true) ) + .setTemporalIndexDetails( new TemporalIndexDetails(true) ) + .setFreeTextDetails( new FreeTextIndexDetails(true) ) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") ) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 1") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime( new Date() )) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 2"))) + .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) + .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) + .build(); + + // Setup the repository that will be tested using a mock instance of MongoDB. + final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, instanceName); + + // Initialize the repository + repo.initialize(details); + + // Initialize it again. + repo.initialize(details); + } + + @Test(expected = NotInitializedException.class) + public void getRyaInstance_notInitialized() throws NotInitializedException, RyaDetailsRepositoryException { + // Setup the repository that will be tested using a mock instance of Accumulo. + final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance"); + + // Try to fetch the details from the uninitialized repository. + repo.getRyaInstanceDetails(); + } + + @Test + public void isInitialized_true() throws AlreadyInitializedException, RyaDetailsRepositoryException { + final String instanceName = "testInstance"; + + // Create the metadata object the repository will be initialized with. + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName(instanceName) + .setRyaVersion("1.2.3.4") + .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) + .setGeoIndexDetails( new GeoIndexDetails(true) ) + .setTemporalIndexDetails( new TemporalIndexDetails(true) ) + .setFreeTextDetails( new FreeTextIndexDetails(true) ) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") ) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 1") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime( new Date() )) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 2"))) + .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) + .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) + .build(); + + // Setup the repository that will be tested using a mock instance of MongoDB. + final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance"); + + // Initialize the repository + repo.initialize(details); + + // Ensure the repository reports that it has been initialized. + assertTrue( repo.isInitialized() ); + } + + @Test + public void isInitialized_false() throws RyaDetailsRepositoryException { + // Setup the repository that will be tested using a mock instance of MongoDB. + final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance"); + + // Ensure the repository reports that is has not been initialized. + assertFalse( repo.isInitialized() ); + } + + @Test + public void update() throws AlreadyInitializedException, RyaDetailsRepositoryException { + final String instanceName = "testInstance"; + + // Create the metadata object the repository will be initialized with. + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName(instanceName) + .setRyaVersion("1.2.3.4") + .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) + .setGeoIndexDetails( new GeoIndexDetails(true) ) + .setTemporalIndexDetails( new TemporalIndexDetails(true) ) + .setFreeTextDetails( new FreeTextIndexDetails(true) ) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") ) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 1") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime( new Date() )) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 2"))) + .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) + .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) + .build(); + + // Setup the repository that will be tested using a mock instance of MongoDB. + final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance"); + + // Initialize the repository + repo.initialize(details); + + // Create a new state for the details. + final RyaDetails updated = new RyaDetails.Builder( details ) + .setGeoIndexDetails( new GeoIndexDetails(false) ) + .build(); + + // Execute the update. + repo.update(details, updated); + + // Show the new state that is stored matches the updated state. + final RyaDetails fetched = repo.getRyaInstanceDetails(); + assertEquals(updated, fetched); + } + + @Test(expected = ConcurrentUpdateException.class) + public void update_outOfDate() throws AlreadyInitializedException, RyaDetailsRepositoryException { + final String instanceName = "testInstance"; + + // Create the metadata object the repository will be initialized with. + final RyaDetails details = RyaDetails.builder() + .setRyaInstanceName(instanceName) + .setRyaVersion("1.2.3.4") + .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) ) + .setGeoIndexDetails( new GeoIndexDetails(true) ) + .setTemporalIndexDetails( new TemporalIndexDetails(true) ) + .setFreeTextDetails( new FreeTextIndexDetails(true) ) + .setPCJIndexDetails( + PCJIndexDetails.builder() + .setEnabled(true) + .setFluoDetails( new FluoDetails("test_instance_rya_pcj_updater") ) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 1") + .setUpdateStrategy(PCJUpdateStrategy.BATCH) + .setLastUpdateTime( new Date() )) + .addPCJDetails( + PCJDetails.builder() + .setId("pcj 2"))) + .setProspectorDetails( new ProspectorDetails(Optional.of(new Date())) ) + .setJoinSelectivityDetails( new JoinSelectivityDetails(Optional.of(new Date())) ) + .build(); + + // Setup the repository that will be tested using a mock instance of MongoDB. + final RyaDetailsRepository repo = new MongoRyaInstanceDetailsRepository(client, "testInstance"); + + // Initialize the repository + repo.initialize(details); + + // Create a new state for the details. + final RyaDetails wrongOriginal = new RyaDetails.Builder( details ) + .setTemporalIndexDetails( new TemporalIndexDetails(false) ) + .build(); + + final RyaDetails updated = new RyaDetails.Builder( details ) + .setGeoIndexDetails( new GeoIndexDetails(false) ) + .build(); + + // Try to execute the update where the old state is not the currently stored state. + repo.update(wrongOriginal, updated); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java deleted file mode 100644 index fefd651..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocIndexIteratorUtil.java +++ /dev/null @@ -1,31 +0,0 @@ -package mvm.rya.accumulo.documentIndex; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -public class DocIndexIteratorUtil { - - - - public static final String DOC_ID_INDEX_DELIM = "\u001D" + "\u001E"; - - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java deleted file mode 100644 index ad38b2b..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/DocumentIndexIntersectingIterator.java +++ /dev/null @@ -1,850 +0,0 @@ -package mvm.rya.accumulo.documentIndex; - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Map; - -import org.apache.accumulo.core.client.IteratorSetting; -import org.apache.accumulo.core.data.ArrayByteSequence; -import org.apache.accumulo.core.data.ByteSequence; -import org.apache.accumulo.core.data.Key; -import org.apache.accumulo.core.data.PartialKey; -import org.apache.accumulo.core.data.Range; -import org.apache.accumulo.core.data.Value; -import org.apache.accumulo.core.iterators.IteratorEnvironment; -import org.apache.accumulo.core.iterators.SortedKeyValueIterator; -import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; -import org.apache.accumulo.core.util.TextUtil; -import org.apache.commons.codec.binary.Base64; -import org.apache.hadoop.io.Text; -import org.apache.log4j.Logger; - -/** - * This iterator facilitates document-partitioned indexing. It involves grouping a set of documents together and indexing those documents into a single row of - * an Accumulo table. This allows a tablet server to perform boolean AND operations on terms in the index. - * - * The table structure should have the following form: - * - * row: shardID, colfam: term, colqual: docID - * - * When you configure this iterator with a set of terms (column families), it will return only the docIDs that appear with all of the specified terms. The - * result will have an empty column family, as follows: - * - * row: shardID, colfam: (empty), colqual: docID - * - * This iterator is commonly used with BatchScanner or AccumuloInputFormat, to parallelize the search over all shardIDs. - * - * This iterator will *ignore* any columnFamilies passed to {@link #seek(Range, Collection, boolean)} as it performs intersections over terms. Extending classes - * should override the {@link TermSource#seekColfams} in their implementation's {@link #init(SortedKeyValueIterator, Map, IteratorEnvironment)} method. - * - * README.shard in docs/examples shows an example of using the IntersectingIterator. - */ -public class DocumentIndexIntersectingIterator implements SortedKeyValueIterator<Key,Value> { - - - - - protected Text nullText = new Text(); - - protected Text getRow(Key key) { - return key.getRow(); - } - - protected Text getTerm(Key key) { - return key.getColumnFamily(); - } - - protected Text getTermCond(Key key) { - return key.getColumnQualifier(); - } - - protected Key buildKey(Text row, TextColumn column) { - return new Key(row, (column.getColumnFamily() == null) ? nullText: column.getColumnFamily(), column.getColumnQualifier()); - } - - protected Key buildKey(Text row, Text term) { - return new Key(row, (term == null) ? nullText : term); - } - - protected Key buildKey(Text row, Text term, Text termCond) { - return new Key(row, (term == null) ? nullText : term, termCond); - } - - protected Key buildFollowRowKey(Key key, Text term, Text termCond) { - return new Key(getRow(key.followingKey(PartialKey.ROW)),(term == null) ? nullText : term, termCond); - } - - protected static final Logger log = Logger.getLogger(DocumentIndexIntersectingIterator.class); - - public static class TermSource { - public SortedKeyValueIterator<Key, Value> iter; - public Text term; - public Text termCond; - public Collection<ByteSequence> seekColfams; - public TextColumn column; - public boolean isPrefix; - public Key top ; - public Key next ; - public Text currentCQ; - private boolean seeked = false; - - public TermSource(TermSource other) { - - this.iter = other.iter; - this.term = other.term; - this.termCond = other.termCond; - this.seekColfams = other.seekColfams; - this.column = other.column; - this.top = other.top; - this.next = other.next; - this.currentCQ = other.currentCQ; - this.isPrefix = other.isPrefix; - } - - - public TermSource(SortedKeyValueIterator<Key, Value> iter, TextColumn column) { - - this.iter = iter; - this.column = column; - this.term = column.getColumnFamily(); - this.termCond = column.getColumnQualifier(); - this.currentCQ = new Text(emptyByteArray); - this.seekColfams = Collections.<ByteSequence> singletonList(new ArrayByteSequence(term - .getBytes(), 0, term.getLength())); - - } - - - - public void seek(Range r) throws IOException { - - if (seeked) { - - if (next != null && !r.beforeStartKey(next)) { - if (next.getColumnFamily().equals(term)) { - this.updateTop(); - } - } else if (iter.hasTop()) { - iter.seek(r, seekColfams, true); - this.updateTopNext(); - } else { - top = null; - next = null; - - } - } else { - - iter.seek(r, seekColfams, true); - this.updateTopNext(); - seeked = true; - } - - } - - - public void next() throws IOException { - - this.updateTop(); - } - - public void updateTop() throws IOException { - - top = next; - if (next != null) { - iter.next(); - if (iter.hasTop()) { - next = iter.getTopKey(); - } else { - next = null; - } - } - - } - - public void updateTopNext() throws IOException { - - if (iter.hasTop()) { - top = iter.getTopKey(); - } else { - top = null; - next = null; - return; - } - - iter.next(); - - if(iter.hasTop()) { - next = iter.getTopKey(); - } else { - next = null; - } - } - - public boolean hasTop() { - return top != null; - } - - - public String getTermString() { - return (this.term == null) ? new String("Iterator") : this.term.toString(); - } - } - - TermSource[] sources; - int sourcesCount = 0; - Range overallRange; - - // query-time settings - protected Text currentRow = null; - protected Text currentTermCond = new Text(emptyByteArray); - static final byte[] emptyByteArray = new byte[0]; - - protected Key topKey = null; - protected Value value = new Value(emptyByteArray); - protected String ctxt = null; - protected boolean hasContext = false; - protected boolean termCondSet = false; - - public DocumentIndexIntersectingIterator() {} - - @Override - public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { - //log.info("Calling deep copy on " + this); - return new DocumentIndexIntersectingIterator(this, env); - } - - private DocumentIndexIntersectingIterator(DocumentIndexIntersectingIterator other, IteratorEnvironment env) { - if (other.sources != null) { - sourcesCount = other.sourcesCount; - sources = new TermSource[sourcesCount]; - for (int i = 0; i < sourcesCount; i++) { - sources[i] = new TermSource(other.sources[i].iter.deepCopy(env), other.sources[i].column); - } - } - } - - @Override - public Key getTopKey() { - - return topKey; - } - - @Override - public Value getTopValue() { - // we don't really care about values - return value; - } - - @Override - public boolean hasTop() { - return currentRow != null; - } - - // precondition: currentRow is not null - private boolean seekOneSource(int sourceID) throws IOException { - // find the next key in the appropriate column family that is at or - // beyond the cursor (currentRow, currentCQ) - // advance the cursor if this source goes beyond it - // return whether we advanced the cursor - - // within this loop progress must be made in one of the following forms: - // - currentRow or currentCQ must be increased - // - the given source must advance its iterator - // this loop will end when any of the following criteria are met - // - the iterator for the given source is pointing to the key - // (currentRow, columnFamilies[sourceID], currentCQ) - // - the given source is out of data and currentRow is set to null - // - the given source has advanced beyond the endRow and currentRow is - // set to null - boolean advancedCursor = false; - - - - - - while (true) { - -// if(currentRow.toString().equals(s)) { -// log.info("Source id is " + sourceID); -// if (sources[sourceID].top != null) { -// log.info("Top row is " + getRow(sources[sourceID].top)); -// log.info("Top cq is " + getTermCond(sources[sourceID].top)); -// } -// if (sources[sourceID].next != null) { -// log.info("Next row is " + getRow(sources[sourceID].next)); -// log.info("Next termCond is " + getTermCond(sources[sourceID].next)); -// } -// } - - if (sources[sourceID].hasTop() == false) { - currentRow = null; - // setting currentRow to null counts as advancing the cursor - return true; - } - // check if we're past the end key - int endCompare = -1; - // we should compare the row to the end of the range - - if (overallRange.getEndKey() != null) { - endCompare = overallRange.getEndKey().getRow().compareTo(sources[sourceID].top.getRow()); - if ((!overallRange.isEndKeyInclusive() && endCompare <= 0) || endCompare < 0) { - currentRow = null; - // setting currentRow to null counts as advancing the cursor - return true; - } - } - - - - int rowCompare = currentRow.compareTo(getRow(sources[sourceID].top)); - // check if this source is already at or beyond currentRow - // if not, then seek to at least the current row - - - - if (rowCompare > 0) { - // seek to at least the currentRow - Key seekKey = buildKey(currentRow, sources[sourceID].term); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - - continue; - } - // check if this source has gone beyond currentRow - // if so, advance currentRow - if (rowCompare < 0) { - currentRow.set(getRow(sources[sourceID].top)); - //log.info("Current row is " + currentRow); - advancedCursor = true; - continue; - } - // we have verified that the current source is positioned in - // currentRow - // now we must make sure we're in the right columnFamily in the - // current row - // Note: Iterators are auto-magically set to the correct - // columnFamily - - if (sources[sourceID].column.isValid()) { - - boolean isPrefix = false; - boolean contextEqual = false; - String tempContext = ""; - - int termCompare; - - String[] cQ = getTermCond(sources[sourceID].top).toString().split("\u0000"); - tempContext = cQ[0]; - - if (!hasContext && ctxt == null) { - ctxt = cQ[0]; - } - - contextEqual = ctxt.equals(cQ[0]); - - String s1 = sources[sourceID].termCond.toString(); - String s2 = cQ[1] + "\u0000" + cQ[2]; - - if (sources[sourceID].isPrefix) { - isPrefix = s2.startsWith(s1 + "\u0000"); - } else { - isPrefix = s2.startsWith(s1); - } - - termCompare = (contextEqual && isPrefix) ? 0 : (ctxt + "\u0000" + s1).compareTo(cQ[0] + "\u0000" + s2); - - // if(currentRow.toString().equals(s)) { - // log.info("Term compare is " + termCompare); - // } - - // check if this source is already on the right columnFamily - // if not, then seek forwards to the right columnFamily - if (termCompare > 0) { - Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt + - "\u0000" + sources[sourceID].termCond.toString())); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - - continue; - } - // check if this source is beyond the right columnFamily - // if so, then seek to the next row - if (termCompare < 0) { - // we're out of entries in the current row, so seek to the - // next one - - if (endCompare == 0) { - // we're done - currentRow = null; - // setting currentRow to null counts as advancing the - // cursor - return true; - } - - - - //advance to next row if context set - all entries in given row exhausted - if (hasContext || tempContext.length() == 0) { - Key seekKey = buildFollowRowKey(sources[sourceID].top, sources[sourceID].term, - new Text(ctxt + "\u0000" + sources[sourceID].termCond.toString())); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - } else { - - if(contextEqual && !isPrefix) { - Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(ctxt + "\u0001")); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - if(sources[sourceID].top != null) { - ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0]; - } - } else { - Key seekKey = buildKey(currentRow, sources[sourceID].term, new Text(tempContext + - "\u0000" + sources[sourceID].termCond.toString())); - sources[sourceID].seek(new Range(seekKey, true, null, false)); - if(sources[sourceID].top != null) { - ctxt = getTermCond(sources[sourceID].top).toString().split("\u0000")[0]; - } - } - - } - - -// if(currentRow.toString().equals(s)) { -// log.info("current term cond is " + currentTermCond); -// -// } - - - continue; - } - } - - - - - - - - - - - //set currentTermCond -- gets appended to end of currentKey column qualifier - //used to determine which term iterator to advance when a new iterator is created - - sources[sourceID].currentCQ.set(getTermCond(sources[sourceID].top)); - - if (sources[sourceID].next != null) { - - //is hasContext, only consider sourceID with next having designated context - //otherwise don't set currentTermCond - if (!termCondSet && hasContext) { - if (sources[sourceID].next.getRow().equals(currentRow) - && sources[sourceID].next.getColumnQualifier().toString() - .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) { - currentTermCond.set(new Text(Integer.toString(sourceID))); - termCondSet = true; - } - } else if(!termCondSet){ - String[] cq = getTermCond(sources[sourceID].next).toString().split("\u0000"); - - //set currentTermCond with preference given to sourceID having next with same context - //otherwise set currentTermCond sourceID with next having termCond as prefix - if (sources[sourceID].next.getRow().equals(currentRow)) { - if (sources[sourceID].next.getColumnQualifier().toString() - .startsWith(ctxt + "\u0000" + sources[sourceID].termCond.toString())) { - currentTermCond.set(new Text(Integer.toString(sourceID))); - termCondSet = true; - } else if ((cq[1] + "\u0000" + cq[2]).startsWith(sources[sourceID].termCond.toString())) { - currentTermCond.set(new Text(Integer.toString(sourceID))); - } - } - } - } - - - break; - } - - return advancedCursor; - } - - @Override - public void next() throws IOException { - if (currentRow == null) { - return; - } - - - - if(currentTermCond.getLength() != 0) { - - int id = Integer.parseInt(currentTermCond.toString()); - - sources[id].next(); - currentTermCond.set(emptyByteArray); - termCondSet = false; - if(sources[id].top != null && !hasContext) { - ctxt = getTermCond(sources[id].top).toString().split("\u0000")[0]; - } - advanceToIntersection(); - return; - } - - sources[0].next(); - if(sources[0].top != null && !hasContext) { - ctxt = getTermCond(sources[0].top).toString().split("\u0000")[0]; - } - advanceToIntersection(); - } - - protected void advanceToIntersection() throws IOException { - boolean cursorChanged = true; - while (cursorChanged) { - // seek all of the sources to at least the highest seen column qualifier in the current row - cursorChanged = false; - for (int i = 0; i < sourcesCount; i++) { -// log.info("New sourceID is " + i); - if (currentRow == null) { - topKey = null; - return; - } - if (seekOneSource(i)) { - currentTermCond.set(emptyByteArray); - termCondSet = false; - cursorChanged = true; - break; - } - } - } - String cq = ""; - for(int i = 0; i < sourcesCount; i++) { - cq = cq + sources[i].currentCQ.toString() + DocIndexIteratorUtil.DOC_ID_INDEX_DELIM; - } - - if (currentTermCond.getLength() == 0) { - topKey = buildKey(currentRow, nullText, new Text(cq + -1)); - } else { - topKey = buildKey(currentRow, nullText, new Text(cq + currentTermCond.toString())); - } - } - - public static String stringTopKey(SortedKeyValueIterator<Key,Value> iter) { - if (iter.hasTop()) - return iter.getTopKey().toString(); - return ""; - } - - private static final String columnOptionName = "columns"; - private static final String columnPrefix = "prefixes"; - private static final String context = "context"; - - - - protected static String encodeColumns(TextColumn[] columns) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < columns.length; i++) { - sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnFamily())))); - sb.append('\n'); - sb.append(new String(Base64.encodeBase64(TextUtil.getBytes(columns[i].getColumnQualifier())))); - sb.append('\u0001'); - } - return sb.toString(); - } - - - - protected static TextColumn[] decodeColumns(String columns) { - String[] columnStrings = columns.split("\u0001"); - TextColumn[] columnTexts = new TextColumn[columnStrings.length]; - for (int i = 0; i < columnStrings.length; i++) { - String[] columnComponents = columnStrings[i].split("\n"); - columnTexts[i] = new TextColumn(new Text(Base64.decodeBase64(columnComponents[0].getBytes())), - new Text(Base64.decodeBase64(columnComponents[1].getBytes()))); - } - return columnTexts; - } - - - - - - /** - * @param context - * @return encoded context - */ - protected static String encodeContext(String context) { - - return new String(Base64.encodeBase64(context.getBytes())); - } - - - - /** - * @param context - * @return decoded context - */ - protected static String decodeContext(String context) { - - if (context == null) { - return null; - } else { - return new String(Base64.decodeBase64(context.getBytes())); - } - } - - - - - - protected static String encodeBooleans(boolean[] prefixes) { - byte[] bytes = new byte[prefixes.length]; - for (int i = 0; i < prefixes.length; i++) { - if (prefixes[i]) - bytes[i] = 1; - else - bytes[i] = 0; - } - return new String(Base64.encodeBase64(bytes)); - } - - /** - * @param flags - * @return decoded flags - */ - protected static boolean[] decodeBooleans(String prefixes) { - // return null of there were no flags - if (prefixes == null) - return null; - - byte[] bytes = Base64.decodeBase64(prefixes.getBytes()); - boolean[] bFlags = new boolean[bytes.length]; - for (int i = 0; i < bytes.length; i++) { - if (bytes[i] == 1) - bFlags[i] = true; - else - bFlags[i] = false; - } - return bFlags; - } - - - - - - - - - @Override - public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { - TextColumn[] terms = decodeColumns(options.get(columnOptionName)); - boolean[] prefixes = decodeBooleans(options.get(columnPrefix)); - ctxt = decodeContext(options.get(context)); - - if(ctxt != null) { - hasContext = true; - } - - - - if (terms.length < 2) { - throw new IllegalArgumentException("IntersectionIterator requires two or more columns families"); - } - - sources = new TermSource[terms.length]; - sources[0] = new TermSource(source, terms[0]); - for (int i = 1; i < terms.length; i++) { - //log.info("For decoded column " + i + " column family is " + terms[i].getColumnFamily() + " and qualifier is " + terms[i].getColumnQualifier()); - sources[i] = new TermSource(source.deepCopy(env), terms[i]); - sources[i].isPrefix = prefixes[i]; - } - sourcesCount = terms.length; - } - - @Override - public void seek(Range range, Collection<ByteSequence> seekColumnFamilies, boolean inclusive) throws IOException { - overallRange = new Range(range); - currentRow = new Text(); - currentTermCond.set(emptyByteArray); - termCondSet = false; - - - -// log.info("Calling seek with range " + range); - - // seek each of the sources to the right column family within the row - // given by key - - Key sourceKey; - - if (rangeCqValid(range)) { - - String[] cqInfo = cqParser(range.getStartKey().getColumnQualifier()); - int id = Integer.parseInt(cqInfo[1]); - - - - if (id >= 0) { - for (int i = 0; i < sourcesCount; i++) { - - if (i == id) { - sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, new Text(cqInfo[0])); - sources[i].seek(new Range(sourceKey, true, null, false)); - sources[i].next(); - if(!hasContext && sources[i].hasTop()) { - ctxt = getTermCond(sources[i].top).toString().split("\u0000")[0]; - } - } else { - sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term); - sources[i].seek(new Range(sourceKey, true, null, false)); - } - } - } else { - - - for (int i = 0; i < sourcesCount; i++) { - sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term, range.getStartKey() - .getColumnQualifier()); - sources[i].seek(new Range(sourceKey, true, null, false)); - } - } - - - } else { - -// log.info("Range is invalid."); - for (int i = 0; i < sourcesCount; i++) { - - if (range.getStartKey() != null) { - - sourceKey = buildKey(getRow(range.getStartKey()), sources[i].term); - - // Seek only to the term for this source as a column family - sources[i].seek(new Range(sourceKey, true, null, false)); - } else { - // Seek only to the term for this source as a column family - - sources[i].seek(range); - } - } - } - - advanceToIntersection(); - - } - - - private String[] cqParser(Text cq) { - - String cQ = cq.toString(); - String[] cqComponents = cQ.split(DocIndexIteratorUtil.DOC_ID_INDEX_DELIM); - int id = -1; - String[] valPos = new String[2]; - - - - - if(cqComponents.length > 1) { - id = Integer.parseInt(cqComponents[cqComponents.length-1]); - if (id >= 0) { - valPos[0] = cqComponents[id].toString(); - valPos[1] = "" + id; - } else { - valPos[0] = cqComponents[0].toString(); - valPos[1] = "" + id; - } - } else { - valPos[0] = cq.toString(); - valPos[1] = "" + -1; - } - - return valPos; - - } - - - private boolean rangeCqValid(Range range) { - return (range.getStartKey() != null) && (range.getStartKey().getColumnQualifier() != null); - } - - - - public void addSource(SortedKeyValueIterator<Key,Value> source, IteratorEnvironment env, TextColumn column) { - // Check if we have space for the added Source - if (sources == null) { - sources = new TermSource[1]; - } else { - // allocate space for node, and copy current tree. - // TODO: Should we change this to an ArrayList so that we can just add() ? - ACCUMULO-1309 - TermSource[] localSources = new TermSource[sources.length + 1]; - int currSource = 0; - for (TermSource myTerm : sources) { - // TODO: Do I need to call new here? or can I just re-use the term? - ACCUMULO-1309 - localSources[currSource] = new TermSource(myTerm); - currSource++; - } - sources = localSources; - } - sources[sourcesCount] = new TermSource(source.deepCopy(env), column); - sourcesCount++; - } - - /** - * Encode the columns to be used when iterating. - * - * @param cfg - * @param columns - */ - public static void setColumnFamilies(IteratorSetting cfg, TextColumn[] columns) { - if (columns.length < 2) - throw new IllegalArgumentException("Must supply at least two terms to intersect"); - - boolean[] prefix = new boolean[columns.length]; - - for(int i = 0; i < columns.length; i++) { - prefix[i] = columns[i].isPrefix(); - } - - - - cfg.addOption(DocumentIndexIntersectingIterator.columnPrefix, DocumentIndexIntersectingIterator.encodeBooleans(prefix)); - cfg.addOption(DocumentIndexIntersectingIterator.columnOptionName, DocumentIndexIntersectingIterator.encodeColumns(columns)); - } - - - - - - public static void setContext(IteratorSetting cfg, String context) { - - cfg.addOption(DocumentIndexIntersectingIterator.context, DocumentIndexIntersectingIterator.encodeContext(context)); - - } - - - - - - - - - - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java b/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java deleted file mode 100644 index 661f62b..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/documentIndex/TextColumn.java +++ /dev/null @@ -1,108 +0,0 @@ -package mvm.rya.accumulo.documentIndex; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -import org.apache.hadoop.io.Text; - -public class TextColumn { - - - private Text columnFamily; - private Text columnQualifier; - private boolean isPrefix = false; - - - - public TextColumn(Text columnFamily, Text columnQualifier) { - this.columnFamily = columnFamily; - this.columnQualifier = columnQualifier; - } - - - public TextColumn(TextColumn other) { - - this.columnFamily = new Text(other.columnFamily); - this.columnQualifier = new Text(other.columnQualifier); - this.isPrefix = other.isPrefix; - - } - - - public Text getColumnFamily() { - return columnFamily; - } - - - public boolean isPrefix() { - return isPrefix; - } - - - public void setIsPrefix(boolean isPrefix) { - this.isPrefix = isPrefix; - } - - - public boolean isValid() { - return (columnFamily != null && columnQualifier != null); - } - - - - public Text getColumnQualifier() { - return columnQualifier; - } - - - public void setColumnFamily(Text cf) { - this.columnFamily = cf; - } - - public void setColumnQualifier(Text cq) { - this.columnQualifier = cq; - } - - public String toString() { - - return columnFamily.toString() + ", " + columnQualifier.toString() + ", prefix:" + isPrefix; - } - - @Override - public boolean equals(Object other) { - - if(other == null) { - return false; - } - - if(!(other instanceof TextColumn)) { - return false; - } - - TextColumn tc = (TextColumn) other; - - return this.columnFamily.equals(tc.columnFamily) && this.columnQualifier.equals(tc.columnQualifier) && this.isPrefix == tc.isPrefix; - - - - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java deleted file mode 100644 index 0966903..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/BindingSetHashJoinIterator.java +++ /dev/null @@ -1,324 +0,0 @@ -package mvm.rya.accumulo.pcj.iterators; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import info.aduna.iteration.CloseableIteration; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.NoSuchElementException; -import java.util.Set; - -import mvm.rya.indexing.external.tupleSet.ExternalTupleSet; - -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; -import org.openrdf.query.algebra.evaluation.QueryBindingSet; - -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; - -/** - * This {@link CloseableIteration} performs a hash join by joining each - * {@link Map.Entry<String, BindingSet>} with all corresponding - * {@link BindingSet} in a Multimap with the same String key. - * - */ -public class BindingSetHashJoinIterator implements - CloseableIteration<BindingSet, QueryEvaluationException> { - - //BindingSets passed to PCJ mapped according to values - //associated with common variables with table - private Multimap<String, BindingSet> bindingJoinVarHash; - //BindingSets taken from PCJ table - private CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter; - private Iterator<BindingSet> joinedBindingSets = Collections - .emptyIterator(); - //If PCJ contains LeftJoin, this is a set of variable in LeftJoin. Used when performing Join. - private Set<String> unAssuredVariables; - //indicates when HashJoin formed from a single collection of join variable or if the size and - //collection of join variables varies -- this is to optimize the join process - private HashJoinType type; - private final BindingSet EMPTY_BINDINGSET = new QueryBindingSet(); - private BindingSet next; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - - /** - * Enum type to indicate whether HashJoin will be performed over a fixed - * subset of variables common to each {@link BindingSet}, or if there is a - * collection of variable subsets over which to join. - * - */ - public enum HashJoinType { - CONSTANT_JOIN_VAR, VARIABLE_JOIN_VAR - }; - - public BindingSetHashJoinIterator( - Multimap<String, BindingSet> bindingJoinVarHash, - CloseableIteration<Map.Entry<String, BindingSet>, QueryEvaluationException> joinIter, - Set<String> unAssuredVariables, HashJoinType type) { - this.bindingJoinVarHash = bindingJoinVarHash; - this.joinIter = joinIter; - this.type = type; - this.unAssuredVariables = unAssuredVariables; - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - if (!hasNextCalled && !isEmpty) { - while (joinedBindingSets.hasNext() || joinIter.hasNext()) { - if (!joinedBindingSets.hasNext()) { - Entry<String, BindingSet> entry = joinIter.next(); - joinedBindingSets = joinBindingSetEntry(entry); - } - if (!joinedBindingSets.hasNext()) { - continue; - } - next = joinedBindingSets.next(); - hasNextCalled = true; - return true; - } - - isEmpty = true; - return false; - } else if (isEmpty) { - return false; - } else { - return true; - } - } - - @Override - public BindingSet next() throws QueryEvaluationException { - if (hasNextCalled) { - hasNextCalled = false; - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - } else { - throw new NoSuchElementException(); - } - } - return next; - } - - @Override - public void remove() throws QueryEvaluationException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws QueryEvaluationException { - joinIter.close(); - } - - /** - * This method takes the valOrderString, which is a key used for computing - * hash joins, and generates multiple keys by pulling off one delimiter - * separated component at a time. This is used when the size of the join key - * varies from {@link Map.Entry} to Entry. It allows the BindingSet to be - * joined using all prefixes of the key. - * - * @param valOrderString - * - key used for hash join - * @return - */ - private List<String> getValueOrders(String valOrderString) { - - List<String> valueOrders = new ArrayList<>(); - String[] splitValOrderString = valOrderString - .split(ExternalTupleSet.VALUE_DELIM); - StringBuffer buffer = new StringBuffer(); - buffer.append(splitValOrderString[0]); - valueOrders.add(buffer.substring(0)); - - for (int i = 1; i < splitValOrderString.length; i++) { - buffer.append(ExternalTupleSet.VALUE_DELIM + splitValOrderString[i]); - valueOrders.add(buffer.substring(0)); - } - - return valueOrders; - } - - /** - * This method verifies that all common variables have a common value and - * then joins the BindingSets together. In the case that the PCJ contains a - * LeftJoin, if the leftBs and rightBs have a common variable with distinct - * values and that common variable is unassured (only appears in LeftJoin), - * this method uses the value corresponding to leftBs. - * - * @param leftBs - * - BindingSet passed into PCJ - * @param rightBs - * - PCJ BindingSet - * @return - joined BindingSet - */ - private BindingSet joinBindingSets(BindingSet leftBs, BindingSet rightBs) { - - Set<String> commonVars = Sets.intersection(leftBs.getBindingNames(), - rightBs.getBindingNames()); - // compare values associated with common variables to make sure - // BindingSets can be joined. Possible for leftBs and rightBs - // to have a common unAssuredVariable in event PCJ contains LeftJoin. - // if values corresponding to common unAssuredVariable do not agree - // add value corresponding to leftBs - for (String s : commonVars) { - if (!leftBs.getValue(s).equals(rightBs.getValue(s)) - && !unAssuredVariables.contains(s)) { - return EMPTY_BINDINGSET; - } - } - QueryBindingSet bs = new QueryBindingSet(removeConstants(leftBs)); - - rightBs = removeConstants(rightBs); - // only add Bindings corresponding to variables that have no value - // assigned. This takes into account case where leftBs and rightBs - // share a common, unAssuredVariable. In this case, use value - // corresponding - // to leftBs, which is effectively performing a LeftJoin. - for (String s : rightBs.getBindingNames()) { - if (bs.getValue(s) == null) { - bs.addBinding(s, rightBs.getValue(s)); - } - } - - return bs; - } - - private BindingSet removeConstants(BindingSet bs) { - QueryBindingSet bSet = new QueryBindingSet(); - for (String s : bs.getBindingNames()) { - if (!s.startsWith(ExternalTupleSet.CONST_PREFIX)) { - bSet.addBinding(bs.getBinding(s)); - } - } - return bSet; - } - - /** - * This method returns an Iterator which joins the given Entry's BindingSet - * to all BindingSets which matching the Entry's key. - * - * @param entry - entry to be joined - * @return - Iterator over joined BindingSets - */ - private Iterator<BindingSet> joinBindingSetEntry( - Map.Entry<String, BindingSet> entry) { - - List<Collection<BindingSet>> matches = new ArrayList<>(); - if (type == HashJoinType.CONSTANT_JOIN_VAR) { - if (bindingJoinVarHash.containsKey(entry.getKey())) { - matches.add(bindingJoinVarHash.get(entry.getKey())); - } - } else { - List<String> valOrders = getValueOrders(entry.getKey()); - for (String s : valOrders) { - if (bindingJoinVarHash.containsKey(s)) { - matches.add(bindingJoinVarHash.get(s)); - } - } - } - - if (matches.size() == 0) { - return Collections.emptyIterator(); - } else { - return new BindingSetCollectionsJoinIterator(entry.getValue(), - matches); - } - - } - - /** - * Given a BindingSet and a List of Collections of BindingSets, this - * Iterator joins the BindingSet with the BindingSets in each Collection - * - */ - private class BindingSetCollectionsJoinIterator implements - Iterator<BindingSet> { - - private Iterator<Collection<BindingSet>> collectionIter; - private Iterator<BindingSet> bsIter = Collections.emptyIterator(); - private BindingSet next; - private BindingSet joinBs; - private boolean hasNextCalled = false; - private boolean isEmpty = false; - - public BindingSetCollectionsJoinIterator(BindingSet bs, - List<Collection<BindingSet>> collection) { - this.collectionIter = collection.iterator(); - this.joinBs = bs; - } - - @Override - public boolean hasNext() { - - if (!hasNextCalled && !isEmpty) { - while (bsIter.hasNext() || collectionIter.hasNext()) { - if (!bsIter.hasNext()) { - bsIter = collectionIter.next().iterator(); - } - next = joinBindingSets(bsIter.next(), joinBs); - if (next == EMPTY_BINDINGSET) { - continue; - } - hasNextCalled = true; - return true; - } - isEmpty = true; - return false; - } else if (isEmpty) { - return false; - } else { - return true; - } - } - - @Override - public BindingSet next() { - if (hasNextCalled) { - hasNextCalled = false; - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - } else { - throw new NoSuchElementException(); - } - } - return next; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/44a2dcf0/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java b/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java deleted file mode 100644 index 2407865..0000000 --- a/extras/indexing/src/main/java/mvm/rya/accumulo/pcj/iterators/IteratorCombiner.java +++ /dev/null @@ -1,107 +0,0 @@ -package mvm.rya.accumulo.pcj.iterators; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import info.aduna.iteration.CloseableIteration; - -import java.util.Collection; -import java.util.Iterator; -import java.util.NoSuchElementException; - -import org.openrdf.query.BindingSet; -import org.openrdf.query.QueryEvaluationException; - -import com.google.common.base.Preconditions; - -/** - * This {@link CloseableIteration} takes in a list of CloseableIterations - * and merges them together into a single CloseableIteration. - * - */ -public class IteratorCombiner implements - CloseableIteration<BindingSet, QueryEvaluationException> { - - - private Collection<CloseableIteration<BindingSet, QueryEvaluationException>> iterators; - private Iterator<CloseableIteration<BindingSet, QueryEvaluationException>> iteratorIterator; - private CloseableIteration<BindingSet, QueryEvaluationException> currIter; - private boolean isEmpty = false; - private boolean hasNextCalled = false; - private BindingSet next; - - public IteratorCombiner(Collection<CloseableIteration<BindingSet, QueryEvaluationException>> iterators) { - Preconditions.checkArgument(iterators.size() > 0); - this.iterators = iterators; - iteratorIterator = iterators.iterator(); - currIter = iteratorIterator.next(); - } - - @Override - public boolean hasNext() throws QueryEvaluationException { - if (!hasNextCalled && !isEmpty) { - while (currIter.hasNext() || iteratorIterator.hasNext()) { - if(!currIter.hasNext()) { - currIter = iteratorIterator.next(); - } - if(!currIter.hasNext()) { - continue; - } - next = currIter.next(); - hasNextCalled = true; - return true; - } - isEmpty = true; - return false; - } else if (isEmpty) { - return false; - } else { - return true; - } - } - - @Override - public BindingSet next() throws QueryEvaluationException { - if (hasNextCalled) { - hasNextCalled = false; - } else if (isEmpty) { - throw new NoSuchElementException(); - } else { - if (this.hasNext()) { - hasNextCalled = false; - } else { - throw new NoSuchElementException(); - } - } - return next; - } - - @Override - public void remove() throws QueryEvaluationException { - throw new UnsupportedOperationException(); - } - - @Override - public void close() throws QueryEvaluationException { - for(CloseableIteration<BindingSet, QueryEvaluationException> iterator: iterators) { - iterator.close(); - } - } - -}
