RYA-185 Finished implementing and testing the accumulo load statement object.
Closes #95 Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/961b16aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/961b16aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/961b16aa Branch: refs/heads/master Commit: 961b16aac9c9c6fcef105e88dc56d3c9abe6804e Parents: a196b50 Author: Kevin Chilton <[email protected]> Authored: Fri Sep 16 01:03:08 2016 -0400 Committer: Aaron Mihalik <[email protected]> Committed: Tue Nov 1 11:43:10 2016 -0400 ---------------------------------------------------------------------- .../mvm/rya/api/client/LoadStatementsFile.java | 20 --- .../rya/api/client/LoadStatementsFile.java | 44 ++++++ .../org/apache/rya/api/client/RyaClient.java | 12 +- .../accumulo/AccumuloLoadStatementsFile.java | 134 +++++++++++++++++++ .../accumulo/AccumuloRyaClientFactory.java | 9 +- .../accumulo/AccumuloLoadStatementsFileIT.java | 126 +++++++++++++++++ extras/indexing/src/test/resources/example.ttl | 5 + 7 files changed, 325 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/961b16aa/common/rya.api/src/main/java/mvm/rya/api/client/LoadStatementsFile.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/mvm/rya/api/client/LoadStatementsFile.java b/common/rya.api/src/main/java/mvm/rya/api/client/LoadStatementsFile.java deleted file mode 100644 index 3918a22..0000000 --- a/common/rya.api/src/main/java/mvm/rya/api/client/LoadStatementsFile.java +++ /dev/null @@ -1,20 +0,0 @@ -package mvm.rya.api.client; - -import java.nio.file.Path; - -import javax.annotation.ParametersAreNonnullByDefault; - -/** - * Loads a local file of RDF statements into an instance of Rya. - */ -@ParametersAreNonnullByDefault -public interface LoadStatementsFile { - - /** - * Loads a local file of RDF statements into an instance of Rya. - * - * @param ryaInstanceName - The name of the Rya instance the statements will be loaded into. (not null) - * @param statementsFile - A file that holds RDF statements that will be loaded. (not null) - */ - public void loadStatements(String ryaInstanceName, Path statementsFile); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/961b16aa/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatementsFile.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatementsFile.java b/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatementsFile.java new file mode 100644 index 0000000..0fd987a --- /dev/null +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/LoadStatementsFile.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client; + +import java.nio.file.Path; + +import org.openrdf.rio.RDFFormat; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * Loads a local file of RDF statements into an instance of Rya. + */ +@DefaultAnnotation(NonNull.class) +public interface LoadStatementsFile { + + /** + * Loads a local file of RDF statements into an instance of Rya. + * + * @param ryaInstanceName - The name of the Rya instance the statements will be loaded into. (not null) + * @param statementsFile - A file that holds RDF statements that will be loaded. (not null) + * @param format - The format of the statements file. (not null) + * @throws InstanceDoesNotExistException No instance of Rya exists for the provided name. + * @throws RyaClientException Something caused the command to fail. + */ + public void loadStatements(String ryaInstanceName, Path statementsFile, RDFFormat format) throws InstanceDoesNotExistException, RyaClientException; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/961b16aa/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java ---------------------------------------------------------------------- diff --git a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java index d4b5047..9e0ef4f 100644 --- a/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java +++ b/common/rya.api/src/main/java/org/apache/rya/api/client/RyaClient.java @@ -41,6 +41,7 @@ public class RyaClient { private final AddUser addUser; private final RemoveUser removeUser; private final Uninstall uninstall; + private final LoadStatementsFile loadStatementsFile; /** * Constructs an instance of {@link RyaClient}. @@ -55,7 +56,8 @@ public class RyaClient { final ListInstances listInstances, final AddUser addUser, final RemoveUser removeUser, - final Uninstall uninstall) { + final Uninstall uninstall, + final LoadStatementsFile loadStatementsFile) { this.install = requireNonNull(install); this.createPcj = requireNonNull(createPcj); this.deletePcj = requireNonNull(deletePcj); @@ -66,6 +68,7 @@ public class RyaClient { this.addUser = requireNonNull(addUser); this.removeUser = requireNonNull(removeUser); this.uninstall = requireNonNull(uninstall); + this.loadStatementsFile = requireNonNull( loadStatementsFile ); } /** @@ -140,4 +143,11 @@ public class RyaClient { public Uninstall getUninstall() { return uninstall; } + + /** + * @return An instance of {@link LoadStatementsFile} that is connected to a Rya storage. + */ + public LoadStatementsFile getLoadStatementsFile() { + return loadStatementsFile; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/961b16aa/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatementsFile.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatementsFile.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatementsFile.java new file mode 100644 index 0000000..f39951d --- /dev/null +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatementsFile.java @@ -0,0 +1,134 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static java.util.Objects.requireNonNull; + +import java.io.IOException; +import java.nio.file.Path; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.log4j.Logger; +import org.apache.rya.accumulo.AccumuloRdfConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.InstanceExists; +import org.apache.rya.api.client.LoadStatementsFile; +import org.apache.rya.api.client.RyaClientException; +import org.apache.rya.api.persist.RyaDAOException; +import org.apache.rya.indexing.accumulo.ConfigUtils; +import org.apache.rya.rdftriplestore.inference.InferenceEngineException; +import org.apache.rya.sail.config.RyaSailFactory; +import org.openrdf.repository.RepositoryException; +import org.openrdf.repository.sail.SailRepository; +import org.openrdf.repository.sail.SailRepositoryConnection; +import org.openrdf.rio.RDFFormat; +import org.openrdf.rio.RDFParseException; +import org.openrdf.sail.Sail; +import org.openrdf.sail.SailException; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * An Accumulo implementation of the {@link LoadStatementsFile} command. + */ +@DefaultAnnotation(NonNull.class) +public class AccumuloLoadStatementsFile extends AccumuloCommand implements LoadStatementsFile { + private static final Logger log = Logger.getLogger(AccumuloLoadStatementsFile.class); + + private final InstanceExists instanceExists; + + /** + * Constructs an instance of {@link AccumuloLoadStatementsFile}. + * + * @param connectionDetails - Details about the values that were used to create + * the connector to the cluster. (not null) + * @param connector - Provides programmatic access to the instance of Accumulo + * that hosts Rya instance. (not null) + */ + public AccumuloLoadStatementsFile(final AccumuloConnectionDetails connectionDetails, final Connector connector) { + super(connectionDetails, connector); + instanceExists = new AccumuloInstanceExists(connectionDetails, connector); + } + + @Override + public void loadStatements(final String ryaInstanceName, final Path statementsFile, final RDFFormat format) throws InstanceDoesNotExistException, RyaClientException { + requireNonNull(ryaInstanceName); + requireNonNull(statementsFile); + requireNonNull(format); + + // Ensure the Rya Instance exists. + if(!instanceExists.exists(ryaInstanceName)) { + throw new InstanceDoesNotExistException(String.format("There is no Rya instance named '%s'.", ryaInstanceName)); + } + + Sail sail = null; + SailRepository sailRepo = null; + SailRepositoryConnection sailRepoConn = null; + + try { + // Get a Sail object that is connected to the Rya instance. + final AccumuloConnectionDetails connDetails = getAccumuloConnectionDetails(); + + final AccumuloRdfConfiguration ryaConf = new AccumuloRdfConfiguration(); + ryaConf.setTablePrefix( ryaInstanceName ); + ryaConf.set(ConfigUtils.CLOUDBASE_ZOOKEEPERS, connDetails.getZookeepers()); + ryaConf.set(ConfigUtils.CLOUDBASE_INSTANCE, connDetails.getInstanceName()); + ryaConf.set(ConfigUtils.CLOUDBASE_USER, connDetails.getUsername()); + ryaConf.set(ConfigUtils.CLOUDBASE_PASSWORD, new String(connDetails.getPassword())); + + sail = RyaSailFactory.getInstance(ryaConf); + + // Load the file. + sailRepo = new SailRepository( sail ); + sailRepoConn = sailRepo.getConnection(); + sailRepoConn.add(statementsFile.toFile(), null, format); + + } catch (final SailException | AccumuloException | AccumuloSecurityException | RyaDAOException | InferenceEngineException e) { + throw new RyaClientException("A problem connecting to the Rya instance named '" + ryaInstanceName + "' has caused the load to fail.", e); + } catch (final RepositoryException | RDFParseException | IOException e) { + throw new RyaClientException("A problem processing the RDF file has caused the load into Rya instance named " + ryaInstanceName + "to fail.", e); + } finally { + // Shut it all down. + if(sailRepoConn != null) { + try { + sailRepoConn.close(); + } catch (final RepositoryException e) { + log.warn("Couldn't close the SailRepoConnection that is attached to the Rya instance.", e); + } + } + if(sailRepo != null) { + try { + sailRepo.shutDown(); + } catch (final RepositoryException e) { + log.warn("Couldn't shut down the SailRepository that is attached to the Rya instance.", e); + } + } + if(sail != null) { + try { + sail.shutDown(); + } catch (final SailException e) { + log.warn("Couldn't shut down the Sail that is attached to the Rya instance.", e); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/961b16aa/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java index 0e642d9..b9742b0 100644 --- a/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java +++ b/extras/indexing/src/main/java/org/apache/rya/api/client/accumulo/AccumuloRyaClientFactory.java @@ -20,12 +20,12 @@ package org.apache.rya.api.client.accumulo; import static java.util.Objects.requireNonNull; -import edu.umd.cs.findbugs.annotations.DefaultAnnotation; -import edu.umd.cs.findbugs.annotations.NonNull; - import org.apache.accumulo.core.client.Connector; import org.apache.rya.api.client.RyaClient; +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import edu.umd.cs.findbugs.annotations.NonNull; + /** * Constructs instance of {@link RyaClient} that are connected to instance of * Rya hosted by Accumulo clusters. @@ -58,6 +58,7 @@ public class AccumuloRyaClientFactory { new AccumuloListInstances(connectionDetails, connector), new AccumuloAddUser(connectionDetails, connector), new AccumuloRemoveUser(connectionDetails, connector), - new AccumuloUninstall(connectionDetails, connector)); + new AccumuloUninstall(connectionDetails, connector), + new AccumuloLoadStatementsFile(connectionDetails, connector)); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/961b16aa/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatementsFileIT.java ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatementsFileIT.java b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatementsFileIT.java new file mode 100644 index 0000000..656737a --- /dev/null +++ b/extras/indexing/src/test/java/org/apache/rya/api/client/accumulo/AccumuloLoadStatementsFileIT.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.api.client.accumulo; + +import static org.junit.Assert.assertEquals; + +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.rya.accumulo.AccumuloITBase; +import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import org.apache.rya.api.client.Install; +import org.apache.rya.api.client.Install.InstallConfiguration; +import org.apache.rya.api.client.InstanceDoesNotExistException; +import org.apache.rya.api.client.RyaClient; +import org.apache.rya.api.domain.RyaStatement; +import org.apache.rya.api.resolver.RyaToRdfConversions; +import org.apache.rya.api.resolver.triple.TripleRow; +import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver; +import org.junit.Test; +import org.openrdf.model.Statement; +import org.openrdf.model.ValueFactory; +import org.openrdf.model.impl.ValueFactoryImpl; +import org.openrdf.rio.RDFFormat; + +/** + * Integration tests the methods of {@link AccumuloLoadStatementsFile}. + */ +public class AccumuloLoadStatementsFileIT extends AccumuloITBase { + + @Test(expected = InstanceDoesNotExistException.class) + public void instanceDoesNotExist() throws Exception { + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + ryaClient.getLoadStatementsFile().loadStatements("testInstance_", Paths.get("src/test/resources/example.ttl"), RDFFormat.TURTLE); + } + + @Test + public void loadTurtleFile() throws Exception { + // Install an instance of Rya. + final String instanceName = "testInstance_"; + final InstallConfiguration installConfig = InstallConfiguration.builder() + .setEnableTableHashPrefix(false) + .setEnableEntityCentricIndex(false) + .setEnableFreeTextIndex(false) + .setEnableTemporalIndex(false) + .setEnablePcjIndex(false) + .setEnableGeoIndex(false) + .setFluoPcjAppName("fluo_app_name") + .build(); + + final AccumuloConnectionDetails connectionDetails = new AccumuloConnectionDetails( + getUsername(), + getPassword().toCharArray(), + getInstanceName(), + getZookeepers()); + + final RyaClient ryaClient = AccumuloRyaClientFactory.build(connectionDetails, getConnector()); + final Install install = ryaClient.getInstall(); + install.install(instanceName, installConfig); + + // Load the test statement file. + ryaClient.getLoadStatementsFile().loadStatements("testInstance_", Paths.get("src/test/resources/example.ttl"), RDFFormat.TURTLE); + + // Verify that the statements were loaded. + final ValueFactory vf = new ValueFactoryImpl(); + + final List<Statement> expected = new ArrayList<>(); + expected.add( vf.createStatement(vf.createURI("http://example#alice"), vf.createURI("http://example#talksTo"), vf.createURI("http://example#bob")) ); + expected.add( vf.createStatement(vf.createURI("http://example#bob"), vf.createURI("http://example#talksTo"), vf.createURI("http://example#charlie")) ); + expected.add( vf.createStatement(vf.createURI("http://example#charlie"), vf.createURI("http://example#likes"), vf.createURI("http://example#icecream")) ); + + final List<Statement> statements = new ArrayList<>(); + + final WholeRowTripleResolver tripleResolver = new WholeRowTripleResolver(); + final Scanner scanner = getConnector().createScanner("testInstance_spo", new Authorizations()); + final Iterator<Entry<Key, Value>> it = scanner.iterator(); + while(it.hasNext()) { + final Entry<Key, Value> next = it.next(); + + final Key key = next.getKey(); + final byte[] row = key.getRow().getBytes(); + final byte[] columnFamily = key.getColumnFamily().getBytes(); + final byte[] columnQualifier = key.getColumnQualifier().getBytes(); + final TripleRow tripleRow = new TripleRow(row, columnFamily, columnQualifier); + + final RyaStatement ryaStatement = tripleResolver.deserialize(TABLE_LAYOUT.SPO, tripleRow); + final Statement statement = RyaToRdfConversions.convertStatement(ryaStatement); + + // Filter out the rya version statement if it is present. + if(!statement.getPredicate().equals( vf.createURI("urn:mvm.rya/2012/05#version") )) { + statements.add( statement ); + } + } + + assertEquals(expected, statements); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/961b16aa/extras/indexing/src/test/resources/example.ttl ---------------------------------------------------------------------- diff --git a/extras/indexing/src/test/resources/example.ttl b/extras/indexing/src/test/resources/example.ttl new file mode 100644 index 0000000..6770792 --- /dev/null +++ b/extras/indexing/src/test/resources/example.ttl @@ -0,0 +1,5 @@ +@prefix example: <http://example#> . + +example:alice example:talksTo example:bob . +example:bob example:talksTo example:charlie. +example:charlie example:likes example:icecream. \ No newline at end of file
