http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.integration/src/test/java/org/apache/rya/indexing/export/StoreToStoreIT.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.integration/src/test/java/org/apache/rya/indexing/export/StoreToStoreIT.java b/extras/rya.export/export.integration/src/test/java/org/apache/rya/indexing/export/StoreToStoreIT.java new file mode 100644 index 0000000..2eef621 --- /dev/null +++ b/extras/rya.export/export.integration/src/test/java/org/apache/rya/indexing/export/StoreToStoreIT.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.indexing.export; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.export.InstanceType; +import org.apache.rya.export.accumulo.AccumuloRyaStatementStore; +import org.apache.rya.export.accumulo.policy.TimestampPolicyAccumuloRyaStatementStore; +import org.apache.rya.export.accumulo.util.AccumuloInstanceDriver; +import org.apache.rya.export.api.metadata.ParentMetadataDoesNotExistException; +import org.apache.rya.export.api.store.AddStatementException; +import org.apache.rya.export.api.store.FetchStatementException; +import org.apache.rya.export.api.store.RyaStatementStore; +import org.apache.rya.export.client.merge.MemoryTimeMerger; +import org.apache.rya.export.client.merge.VisibilityStatementMerger; +import org.apache.rya.export.mongo.MongoRyaStatementStore; +import org.apache.rya.export.mongo.policy.TimestampPolicyMongoRyaStatementStore; +import org.apache.rya.mongodb.MongoDBRyaDAO; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import com.mongodb.MongoClient; + +@RunWith(Parameterized.class) +public class StoreToStoreIT extends ITBase { + private static final String RYA_INSTANCE = "ryaInstance"; + private static final InstanceType type = InstanceType.MOCK; + private static final String tablePrefix = "accumuloTest"; + private static final String auths = "U"; + + private final RyaStatementStore parentStore; + private final RyaStatementStore childStore; + private final static List<MongoClient> clients = new ArrayList<>(); + private final static List<AccumuloInstanceDriver> drivers = new ArrayList<>(); + private static Date currentDate; + + private static TimestampPolicyMongoRyaStatementStore getParentMongo() throws Exception { + final MongoClient mongo = getNewMongoResources(RYA_INSTANCE); + final MongoDBRyaDAO dao = new MongoDBRyaDAO(ITBase.getConf(mongo), mongo); + final MongoRyaStatementStore store = new MongoRyaStatementStore(mongo, RYA_INSTANCE, dao); + final TimestampPolicyMongoRyaStatementStore timeStore = new TimestampPolicyMongoRyaStatementStore(store, currentDate, RYA_INSTANCE); + clients.add(mongo); + return timeStore; + } + + private static MongoRyaStatementStore getChildMongo() throws Exception { + final MongoClient mongo = getNewMongoResources(RYA_INSTANCE); + final MongoDBRyaDAO dao = new MongoDBRyaDAO(ITBase.getConf(mongo), mongo); + final MongoRyaStatementStore store = new MongoRyaStatementStore(mongo, RYA_INSTANCE, dao); + clients.add(mongo); + return store; + } + + private static TimestampPolicyAccumuloRyaStatementStore getParentAccumulo() throws Exception { + final AccumuloInstanceDriver driver = new AccumuloInstanceDriver(RYA_INSTANCE, type, true, false, true, "TEST1", PASSWORD, RYA_INSTANCE, tablePrefix, auths, ""); + driver.setUp(); + final AccumuloRyaStatementStore store = new AccumuloRyaStatementStore(driver.getDao(), tablePrefix, RYA_INSTANCE); + drivers.add(driver); + return new TimestampPolicyAccumuloRyaStatementStore(store, currentDate); + } + + private static AccumuloRyaStatementStore getChildAccumulo() throws Exception { + final AccumuloInstanceDriver driver = new AccumuloInstanceDriver(RYA_INSTANCE, type, true, false, false, "TEST2", PASSWORD, RYA_INSTANCE+"_child", tablePrefix, auths, ""); + driver.setUp(); + drivers.add(driver); + return new AccumuloRyaStatementStore(driver.getDao(), tablePrefix, RYA_INSTANCE); + } + + @Before + public void clearDBS() throws Exception { + for(final AccumuloInstanceDriver driver : drivers) { + driver.setUpInstance(); + driver.setUpTables(); + driver.setUpDao(); + driver.setUpConfig(); + } + } + + @After + public void cleanupTables() throws Exception { + for(final AccumuloInstanceDriver driver : drivers) { + driver.tearDown(); + } + for(final MongoClient client : clients) { + client.dropDatabase(RYA_INSTANCE); + } + } + + @AfterClass + public static void shutdown() throws Exception { + for(final AccumuloInstanceDriver driver : drivers) { + driver.tearDown(); + } + for(final MongoClient client : clients) { + client.close(); + } + } + + @Parameterized.Parameters + public static Collection<Object[]> instancesToTest() throws Exception { + currentDate = new Date(); + final Collection<Object[]> stores = new ArrayList<>(); + stores.add(new Object[]{getParentMongo(), getChildMongo()}); + stores.add(new Object[]{getParentMongo(), getChildAccumulo()}); + stores.add(new Object[]{getParentAccumulo(), getChildMongo()}); + stores.add(new Object[]{getParentAccumulo(), getChildAccumulo()}); + return stores; + } + + public StoreToStoreIT(final RyaStatementStore parentStore, + final RyaStatementStore childStore) { + this.parentStore = parentStore; + this.childStore = childStore; + } + + @Test + public void cloneTest() throws AddStatementException, FetchStatementException, ParentMetadataDoesNotExistException { + loadMockStatements(parentStore, 50, new Date(currentDate.getTime() + 10000L)); + + final MemoryTimeMerger merger = new MemoryTimeMerger(parentStore, childStore, + new VisibilityStatementMerger(), currentDate, RYA_INSTANCE, 0L); + merger.runJob(); + assertEquals(50, count(childStore)); + } + + @Test + public void no_statementsTest() throws AddStatementException, FetchStatementException { + loadMockStatements(parentStore, 50, new Date(0L)); + + assertEquals(0, count(childStore)); + final MemoryTimeMerger merger = new MemoryTimeMerger(parentStore, childStore, + new VisibilityStatementMerger(), currentDate, RYA_INSTANCE, 0L); + merger.runJob(); + assertEquals(0, count(childStore)); + } + + @Test + public void childToParent_ChildAddTest() throws AddStatementException, FetchStatementException { + loadMockStatements(parentStore, 50, new Date(currentDate.getTime() + 100L)); + + //setup child + final MemoryTimeMerger merger = new MemoryTimeMerger(parentStore, childStore, + new VisibilityStatementMerger(), currentDate, RYA_INSTANCE, 0L); + merger.runJob(); + + //add a few statements to child + final RyaStatement stmnt1 = makeRyaStatement("http://subject", "http://predicate", "http://51"); + final RyaStatement stmnt2 = makeRyaStatement("http://subject", "http://predicate", "http://52"); + childStore.addStatement(stmnt1); + childStore.addStatement(stmnt2); + + final MemoryTimeMerger otherMerger = new MemoryTimeMerger(childStore, parentStore, + new VisibilityStatementMerger(), currentDate, RYA_INSTANCE, 0L); + otherMerger.runJob(); + assertEquals(52, count(parentStore)); + } + + @Test + public void childToParent_ChildReAddsDeletedStatementTest() throws Exception { + loadMockStatements(parentStore, 50, new Date(currentDate.getTime() + 10000L)); + + //setup child + final MemoryTimeMerger merger = new MemoryTimeMerger(parentStore, childStore, + new VisibilityStatementMerger(), currentDate, RYA_INSTANCE, 0L); + merger.runJob(); + + //remove a statement from the parent + final RyaStatement stmnt1 = makeRyaStatement("http://subject", "http://predicate", "http://1"); + parentStore.removeStatement(stmnt1); + assertEquals(49, count(parentStore)); + + assertFalse(parentStore.containsStatement(stmnt1)); + + final MemoryTimeMerger otherMerger = new MemoryTimeMerger(childStore, parentStore, + new VisibilityStatementMerger(), currentDate, RYA_INSTANCE, 0L); + otherMerger.runJob(); + + //merging will have added the statement back + assertEquals(50, count(parentStore)); + } + + @Test + public void childToParent_BothAddTest() throws Exception { + loadMockStatements(parentStore, 50, new Date(currentDate.getTime() + 10000L)); + + assertEquals(0, count(childStore)); + final MemoryTimeMerger merger = new MemoryTimeMerger(parentStore, childStore, + new VisibilityStatementMerger(), currentDate, RYA_INSTANCE, 0L); + merger.runJob(); + + + assertEquals(50, count(parentStore)); + assertEquals(50, count(childStore)); + + //add a statement to each store + final RyaStatement stmnt1 = makeRyaStatement("http://subject", "http://predicate", "http://add"); + final RyaStatement stmnt2 = makeRyaStatement("http://subject", "http://predicate", "http://add2"); + stmnt1.setTimestamp(new Date().getTime() + 10L); + stmnt2.setTimestamp(currentDate.getTime() + 1000L); + parentStore.addStatement(stmnt1); + childStore.addStatement(stmnt2); + + final MemoryTimeMerger otherMerger = new MemoryTimeMerger(childStore, parentStore, + new VisibilityStatementMerger(), currentDate, RYA_INSTANCE, 0L); + otherMerger.runJob(); + //both should still be there + assertEquals(52, count(parentStore)); + } + + private void loadMockStatements(final RyaStatementStore store, final int count, final Date timestamp) throws AddStatementException { + for(int ii = 0; ii < count; ii++) { + final RyaStatement statement = makeRyaStatement("http://subject", "http://predicate", "http://"+ii); + statement.setTimestamp(timestamp.getTime()); + parentStore.addStatement(statement); + } + } + + private int count(final RyaStatementStore store) throws FetchStatementException { + final Iterator<RyaStatement> statements = store.fetchStatements(); + int count = 0; + while(statements.hasNext()) { + final RyaStatement statement = statements.next(); + System.out.println(statement.getObject().getData() + " " + statement.getTimestamp()); + count++; + } + return count; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.mongo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.mongo/pom.xml b/extras/rya.export/export.mongo/pom.xml index e350a92..699daf7 100644 --- a/extras/rya.export/export.mongo/pom.xml +++ b/extras/rya.export/export.mongo/pom.xml @@ -25,7 +25,7 @@ under the License. <parent> <groupId>org.apache.rya</groupId> <artifactId>rya.export.parent</artifactId> - <version>3.2.10-SNAPSHOT</version> + <version>3.2.11-incubating-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> @@ -39,26 +39,9 @@ under the License. <dependency> <groupId>org.apache.rya</groupId> <artifactId>rya.export.api</artifactId> - <version>3.2.10-SNAPSHOT</version> + <version>${project.version}</version> </dependency> - - <!-- Log4j 2 bridge, api, and core. --> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-1.2-api</artifactId> - <version>2.5</version> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - <version>2.5</version> - </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - <version>2.5</version> - </dependency> - + <!-- Testing dependencies. --> <dependency> <groupId>junit</groupId> @@ -66,38 +49,5 @@ under the License. <scope>test</scope> </dependency> </dependencies> - - <build> - <plugins> - <!-- Use the pre-build 'jar-with-dependencies' assembly to package the dependent class files into the final jar. - This creates a jar file that can be deployed to Fluo without having to include any dependent jars. --> - <plugin> - <artifactId>maven-assembly-plugin</artifactId> - <configuration> - <archive> - <manifest> - <addClasspath>true</addClasspath> - <classpathLayoutType>custom</classpathLayoutType> - <customClasspathLayout>WEB-INF/lib/$${artifact.groupIdPath}/$${artifact.artifactId}-$${artifact.version}$${dashClassifier?}.$${artifact.extension}</customClasspathLayout> - - <mainClass>org.apache.rya.indexing.pcj.fluo.PcjAdminClient</mainClass> - </manifest> - </archive> - <descriptorRefs> - <descriptorRef>jar-with-dependencies</descriptorRef> - </descriptorRefs> - </configuration> - <executions> - <execution> - <id>make-assembly</id> - <phase>package</phase> - <goals> - <goal>single</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> </project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/MongoRyaStatementStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/MongoRyaStatementStore.java b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/MongoRyaStatementStore.java index ed24080..a25f6b2 100644 --- a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/MongoRyaStatementStore.java +++ b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/MongoRyaStatementStore.java @@ -19,17 +19,25 @@ package org.apache.rya.export.mongo; import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Optional; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.export.api.metadata.MergeParentMetadata; +import org.apache.rya.export.api.metadata.ParentMetadataExistsException; import org.apache.rya.export.api.store.AddStatementException; import org.apache.rya.export.api.store.ContainsStatementException; import org.apache.rya.export.api.store.RemoveStatementException; import org.apache.rya.export.api.store.RyaStatementStore; import org.apache.rya.export.api.store.UpdateStatementException; +import org.apache.rya.export.mongo.parent.MongoParentMetadataRepository; +import org.apache.rya.mongodb.MongoDBRyaDAO; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; import com.mongodb.BasicDBObject; import com.mongodb.Cursor; @@ -37,23 +45,20 @@ import com.mongodb.DB; import com.mongodb.DBObject; import com.mongodb.MongoClient; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.persist.RyaDAOException; -import mvm.rya.mongodb.MongoDBRyaDAO; -import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; - /** * Mongo implementation of {@link RyaStatementStore}. Allows for exporting and * importing rya statements from MongoDB. */ -public class MongoRyaStatementStore implements RyaStatementStore{ +public class MongoRyaStatementStore implements RyaStatementStore { public static final String TRIPLES_COLLECTION = "rya__triples"; - private final SimpleMongoDBStorageStrategy adapter; - private final DB db; + public static final String METADATA_COLLECTION = "parent_metadata"; + protected final SimpleMongoDBStorageStrategy adapter; + protected final DB db; private final String ryaInstanceName; private final MongoClient client; private final MongoDBRyaDAO dao; + private final MongoParentMetadataRepository parentMetadataRepo; /** * Creates a new {@link MongoRyaStatementStore}. @@ -67,6 +72,7 @@ public class MongoRyaStatementStore implements RyaStatementStore{ this.dao = checkNotNull(dao); db = this.client.getDB(ryaInstanceName); adapter = new SimpleMongoDBStorageStrategy(); + parentMetadataRepo = new MongoParentMetadataRepository(client, ryaInstance); } @Override @@ -105,7 +111,10 @@ public class MongoRyaStatementStore implements RyaStatementStore{ return db.getCollection(TRIPLES_COLLECTION).find(dbo).count() > 0; } - protected MongoClient getClient() { + /** + * @return The {@link MongoClient} to connect to mongo. + */ + public MongoClient getClient() { return client; } @@ -115,4 +124,19 @@ public class MongoRyaStatementStore implements RyaStatementStore{ //Do not want a throw a not-implemented exception since that could potentially //break stuff. } + + @Override + public Optional<MergeParentMetadata> getParentMetadata() { + MergeParentMetadata metadata = null; + try { + metadata = parentMetadataRepo.get(); + } finally { + return Optional.ofNullable(metadata); + } + } + + @Override + public void setParentMetadata(final MergeParentMetadata metadata) throws ParentMetadataExistsException { + parentMetadataRepo.set(metadata); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/MongoRyaStatementStoreDecorator.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/MongoRyaStatementStoreDecorator.java b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/MongoRyaStatementStoreDecorator.java deleted file mode 100644 index 6f02b44..0000000 --- a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/MongoRyaStatementStoreDecorator.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.export.mongo; - -import static com.google.common.base.Preconditions.checkNotNull; - -import org.apache.rya.export.api.store.RyaStatementStore; -import org.apache.rya.export.api.store.RyaStatementStoreDecorator; - -import com.mongodb.MongoClient; - -/** - * Ensures the decorator that the decorated store is mongodb backed. - */ -public abstract class MongoRyaStatementStoreDecorator extends RyaStatementStoreDecorator { - final MongoRyaStatementStore store; - - /** - * Creates a new {@link MongoRyaStatementStoreDecorator} around the provided {@link RyaStatementStore}. - * @param store - The {@link RyaStatementStore} to decorate. - */ - public MongoRyaStatementStoreDecorator(final MongoRyaStatementStore store) { - super(store); - this.store = checkNotNull(store); - } - - protected MongoClient getClient() { - return store.getClient(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/MongoParentMetadataRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/MongoParentMetadataRepository.java b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/MongoParentMetadataRepository.java index 84471d3..b42fe62 100644 --- a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/MongoParentMetadataRepository.java +++ b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/MongoParentMetadataRepository.java @@ -20,10 +20,10 @@ package org.apache.rya.export.mongo.parent; import static com.google.common.base.Preconditions.checkNotNull; -import org.apache.rya.export.api.parent.MergeParentMetadata; -import org.apache.rya.export.api.parent.ParentMetadataDoesNotExistException; -import org.apache.rya.export.api.parent.ParentMetadataExistsException; -import org.apache.rya.export.api.parent.ParentMetadataRepository; +import org.apache.rya.export.api.metadata.MergeParentMetadata; +import org.apache.rya.export.api.metadata.ParentMetadataDoesNotExistException; +import org.apache.rya.export.api.metadata.ParentMetadataExistsException; +import org.apache.rya.export.api.metadata.ParentMetadataRepository; import com.mongodb.DBCollection; import com.mongodb.DBObject; @@ -38,8 +38,9 @@ public class MongoParentMetadataRepository implements ParentMetadataRepository { private final DBCollection collection; /** - * @param client - * @param dbName + * Creates a new {@link MongoParentMetadataRepository} + * @param client - The client connection to mongo. + * @param dbName - The database to connect to, usually the RyaInstanceName */ public MongoParentMetadataRepository(final MongoClient client, final String dbName) { checkNotNull(client); @@ -65,5 +66,4 @@ public class MongoParentMetadataRepository implements ParentMetadataRepository { final DBObject dbo = adapter.serialize(metadata); collection.insert(dbo); } - -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapter.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapter.java b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapter.java index c6f2401..fea962d 100644 --- a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapter.java +++ b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapter.java @@ -20,7 +20,7 @@ package org.apache.rya.export.mongo.parent; import java.util.Date; -import org.apache.rya.export.api.parent.MergeParentMetadata; +import org.apache.rya.export.api.metadata.MergeParentMetadata; import com.mongodb.BasicDBObjectBuilder; import com.mongodb.DBObject; @@ -34,6 +34,7 @@ public class ParentMetadataRepositoryAdapter { public static final String TIMESTAMP_KEY = "timestamp"; public static final String FILTER_TIMESTAMP_KEY = "filterTimestamp"; public static final String PARENT_TIME_OFFSET_KEY = "parentTimeOffset"; + /** * Serializes the {@link MergeParentMetadata} into a mongoDB object. * @param metadata - The {@link MergeParentMetadata} to serialize. http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/policy/TimestampPolicyMongoRyaStatementStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/policy/TimestampPolicyMongoRyaStatementStore.java b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/policy/TimestampPolicyMongoRyaStatementStore.java new file mode 100644 index 0000000..2bb923e --- /dev/null +++ b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/policy/TimestampPolicyMongoRyaStatementStore.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.rya.export.mongo.policy; + +import static org.apache.rya.export.mongo.MongoRyaStatementStore.TRIPLES_COLLECTION; +import static org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; + +import java.util.ArrayList; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.export.api.conf.policy.TimestampPolicyStatementStore; +import org.apache.rya.export.api.store.FetchStatementException; +import org.apache.rya.export.api.store.RyaStatementStore; +import org.apache.rya.export.mongo.MongoRyaStatementStore; +import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy; + +import com.mongodb.BasicDBObject; +import com.mongodb.BasicDBObjectBuilder; +import com.mongodb.Cursor; +import com.mongodb.DB; +import com.mongodb.DBObject; + +/** + * A {@link RyaStatementStore} decorated to connect to a Mongo database and + * filter statements based on a timestamp. + */ +public class TimestampPolicyMongoRyaStatementStore extends TimestampPolicyStatementStore { + private final SimpleMongoDBStorageStrategy adapter; + private final DB db; + + /** + * Creates a new {@link TimestampPolicyMongoRyaStatementStore} + * @param store - The {@link MongoRyaStatementStore} to connect to + * @param timestamp - The Date to filter statements on. + * @param ryaInstanceName - The rya instance to merge statements to/from. + */ + public TimestampPolicyMongoRyaStatementStore(final MongoRyaStatementStore store, final Date timestamp, final String ryaInstanceName) { + super(store, timestamp); + adapter = new SimpleMongoDBStorageStrategy(); + db = store.getClient().getDB(ryaInstanceName); + } + + @Override + public Iterator<RyaStatement> fetchStatements() throws FetchStatementException { + final DBObject timeObj = new BasicDBObjectBuilder() + .add(SimpleMongoDBStorageStrategy.TIMESTAMP, + new BasicDBObjectBuilder() + .add("$gte", timestamp.getTime()).get()) + .get(); + final Cursor cur = db.getCollection(TRIPLES_COLLECTION).find(timeObj).sort(new BasicDBObject(TIMESTAMP, 1)); + final List<RyaStatement> statements = new ArrayList<>(); + while(cur.hasNext()) { + final RyaStatement statement = adapter.deserializeDBObject(cur.next()); + statements.add(statement); + } + return statements.iterator(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/time/TimeMongoRyaStatementStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/time/TimeMongoRyaStatementStore.java b/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/time/TimeMongoRyaStatementStore.java deleted file mode 100644 index 4c948ed..0000000 --- a/extras/rya.export/export.mongo/src/main/java/org/apache/rya/export/mongo/time/TimeMongoRyaStatementStore.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.rya.export.mongo.time; - -import static com.google.common.base.Preconditions.checkNotNull; -import static mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy.TIMESTAMP; - -import java.util.ArrayList; -import java.util.Date; -import java.util.Iterator; -import java.util.List; - -import org.apache.commons.lang.builder.EqualsBuilder; -import org.apache.commons.lang.builder.HashCodeBuilder; -import org.apache.rya.export.mongo.MongoRyaStatementStore; -import org.apache.rya.export.mongo.MongoRyaStatementStoreDecorator; - -import com.mongodb.BasicDBObject; -import com.mongodb.Cursor; -import com.mongodb.DB; - -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.mongodb.dao.SimpleMongoDBStorageStrategy; - -/** - * A {@link MongoRyaStatementStore} that, when fetching statements, only - * fetches statements after a certain time. - */ -public class TimeMongoRyaStatementStore extends MongoRyaStatementStoreDecorator { - private final Date time; - private final DB db; - - private final SimpleMongoDBStorageStrategy adapter; - - /** - * Creates a new {@link TimeMongoRyaStatementStore}. - * @param store - The {@link MongoRyaStatementStore} to decorate. - * @param time - The time used when fetching statements. - * @param ryaInstanceName - The rya instance used. - */ - public TimeMongoRyaStatementStore(final MongoRyaStatementStore store, final Date time, final String ryaInstanceName) { - super(store); - this.time = checkNotNull(time); - db = getClient().getDB(ryaInstanceName); - adapter = new SimpleMongoDBStorageStrategy(); - } - - /** - * @return - * @see org.apache.rya.export.mongo.MongoRyaStatementStore#fetchStatements() - */ - @Override - public Iterator<RyaStatement> fetchStatements() { - //RyaStatement timestamps are stored as longs, not dates. - final BasicDBObject dbo = new BasicDBObject(TIMESTAMP, new BasicDBObject("$gte", time.getTime())); - final Cursor cur = db.getCollection(MongoRyaStatementStore.TRIPLES_COLLECTION).find(dbo).sort(new BasicDBObject(TIMESTAMP, 1)); - final List<RyaStatement> statements = new ArrayList<>(); - while(cur.hasNext()) { - final RyaStatement statement = adapter.deserializeDBObject(cur.next()); - statements.add(statement); - } - return statements.iterator(); - } - - @Override - public boolean equals(final Object obj) { - if(obj instanceof TimeMongoRyaStatementStore) { - final TimeMongoRyaStatementStore other = (TimeMongoRyaStatementStore) obj; - final EqualsBuilder builder = new EqualsBuilder() - .appendSuper(super.equals(obj)) - .append(time, other.time); - return builder.isEquals(); - } - return false; - } - - - @Override - public int hashCode() { - final HashCodeBuilder builder = new HashCodeBuilder() - .appendSuper(super.hashCode()) - .append(time); - return builder.toHashCode(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/export.mongo/src/test/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapterTest.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.mongo/src/test/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapterTest.java b/extras/rya.export/export.mongo/src/test/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapterTest.java index d5d4757..d0e412d 100644 --- a/extras/rya.export/export.mongo/src/test/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapterTest.java +++ b/extras/rya.export/export.mongo/src/test/java/org/apache/rya/export/mongo/parent/ParentMetadataRepositoryAdapterTest.java @@ -26,7 +26,7 @@ import static org.junit.Assert.assertEquals; import java.util.Date; -import org.apache.rya.export.api.parent.MergeParentMetadata; +import org.apache.rya.export.api.metadata.MergeParentMetadata; import org.junit.Test; import com.mongodb.BasicDBObjectBuilder; http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.export/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.export/pom.xml b/extras/rya.export/pom.xml index dfb9bad..9c95bc7 100644 --- a/extras/rya.export/pom.xml +++ b/extras/rya.export/pom.xml @@ -25,7 +25,7 @@ under the License. <parent> <groupId>org.apache.rya</groupId> <artifactId>rya.extras</artifactId> - <version>3.2.10-SNAPSHOT</version> + <version>3.2.11-incubating-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> @@ -37,8 +37,9 @@ under the License. <packaging>pom</packaging> <modules> - <module>export.accumulo</module> <module>export.api</module> + <module>export.accumulo</module> + <module>export.client</module> <module>export.mongo</module> <module>export.integration</module> </modules> @@ -64,22 +65,22 @@ under the License. </plugins> </build> - <!-- - note, this parent pom can likely be removed provided the contents of its - dependencyManagement section are transferred to org.apache.rya:rya-project - --> + <!-- + note, this parent pom can likely be removed provided the contents of its + dependencyManagement section are transferred to org.apache.rya:rya-project + --> <dependencyManagement> <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <version>${hadoop.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop.version}</version> - </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <version>${hadoop.version}</version> + </dependency> <dependency> <groupId>org.openrdf.sesame</groupId> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java ---------------------------------------------------------------------- diff --git a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java index 8cb3a7b..5d2c52d 100644 --- a/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java +++ b/extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java @@ -87,8 +87,9 @@ public class GeoRyaSailFactory { final MongoRyaInstanceDetailsRepository ryaDetailsRepo = new MongoRyaInstanceDetailsRepository(client, mongoConfig.getCollectionName()); RyaDetailsToConfiguration.addRyaDetailsToConfiguration(ryaDetailsRepo.getRyaInstanceDetails(), mongoConfig); } catch (final RyaDetailsRepositoryException e) { - LOG.info("Instance does not have a rya details collection, skipping."); - } dao = getMongoDAO((MongoDBRdfConfiguration)rdfConfig, client); + LOG.info("Instance does not have a rya details collection, skipping."); + } + dao = getMongoDAO((MongoDBRdfConfiguration)rdfConfig, client); } else { rdfConfig = new AccumuloRdfConfiguration(config); user = rdfConfig.get(ConfigUtils.CLOUDBASE_USER); @@ -143,4 +144,18 @@ public class GeoRyaSailFactory { dao.init(); return dao; } +<<<<<<< HEAD:extras/rya.geoindexing/src/main/java/org/apache/rya/indexing/GeoRyaSailFactory.java +======= + + private static void updateAccumuloConfig(final AccumuloRdfConfiguration config, final String user, final String pswd, final String ryaInstance) throws AccumuloException, AccumuloSecurityException { + try { + final PasswordToken pswdToken = new PasswordToken(pswd); + final Instance accInst = ConfigUtils.getInstance(config); + final AccumuloRyaInstanceDetailsRepository ryaDetailsRepo = new AccumuloRyaInstanceDetailsRepository(accInst.getConnector(user, pswdToken), ryaInstance); + RyaDetailsToConfiguration.addRyaDetailsToConfiguration(ryaDetailsRepo.getRyaInstanceDetails(), config); + } catch(final RyaDetailsRepositoryException e) { + LOG.info("Instance does not have a rya details collection, skipping."); + } + } +>>>>>>> RYA-161 Merge Tool with client:extras/indexing/src/main/java/mvm/rya/sail/config/RyaSailFactory.java } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/b61920ab/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 053a692..3c47038 100644 --- a/pom.xml +++ b/pom.xml @@ -587,7 +587,6 @@ under the License. <groupId>org.apache.accumulo</groupId> <artifactId>accumulo-minicluster</artifactId> <version>${accumulo.version}</version> - <scope>test</scope> <exclusions> <!-- released under the LGPL license --> <exclusion>
