RYA-126 Accumulo side implementation of export.api. (#15)
Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/f2b046f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/f2b046f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/f2b046f3 Branch: refs/heads/master Commit: f2b046f36765227571a4ba5155e9fd368a8cb16a Parents: 2c37b24 Author: ejwhite922 <eric.wh...@sparta.com> Authored: Thu Sep 8 13:14:10 2016 -0400 Committer: isper3at <smith...@gmail.com> Committed: Wed Nov 2 17:51:18 2016 -0400 ---------------------------------------------------------------------- extras/rya.export/export.accumulo/pom.xml | 160 +++++ .../rya.export/export.accumulo/src/.gitignore | 1 + .../accumulo/AccumuloRyaStatementStore.java | 220 +++++++ .../export/accumulo/common/InstanceType.java | 62 ++ .../accumulo/conf/AccumuloExportConstants.java | 195 ++++++ .../AccumuloParentMetadataRepository.java | 265 ++++++++ .../accumulo/util/AccumuloInstanceDriver.java | 603 ++++++++++++++++++ .../export/accumulo/util/AccumuloRyaUtils.java | 447 +++++++++++++ .../api/conf/AccumuloConfigurationAdapter.java | 67 ++ .../api/conf/AccumuloMergeConfiguration.java | 182 ++++++ .../src/main/xsd/AccumuloMergeConfiguration.xsd | 46 ++ .../accumulo/AccumuloRyaStatementStoreTest.java | 384 ++++++++++++ .../apache/rya/export/accumulo/TestUtils.java | 80 +++ .../driver/AccumuloDualInstanceDriver.java | 621 +++++++++++++++++++ ...muloParentMetadataRepositoryAdapterTest.java | 67 ++ .../conf/AccumuloConfigurationAdapterTest.java | 145 +++++ extras/rya.export/pom.xml | 1 + 17 files changed, 3546 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/pom.xml ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.accumulo/pom.xml b/extras/rya.export/export.accumulo/pom.xml new file mode 100644 index 0000000..23598b0 --- /dev/null +++ b/extras/rya.export/export.accumulo/pom.xml @@ -0,0 +1,160 @@ +<?xml version="1.0" encoding="utf-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project + xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <parent> + <groupId>org.apache.rya</groupId> + <artifactId>rya.export.parent</artifactId> + <version>3.2.10-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>rya.export.accumulo</artifactId> + + <name>Apache Rya Export Accumulo</name> + <description>Contains the accumulo implementation of the export tool.</description> + + <dependencies> + <!-- Rya Runtime Dependencies. --> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.export.api</artifactId> + <version>3.2.10-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.indexing</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.rya</groupId> + <artifactId>rya.mapreduce</artifactId> + </dependency> + + <!-- Log4j 2 bridge, api, and core. --> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-1.2-api</artifactId> + <version>2.5</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-api</artifactId> + <version>2.5</version> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-core</artifactId> + <version>2.5</version> + </dependency> + + <!-- Accumulo --> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-core</artifactId> + <version>1.6.4</version> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo-start</artifactId> + <version>1.6.4</version> + </dependency> + <dependency> + <groupId>org.apache.accumulo</groupId> + <artifactId>accumulo</artifactId> + <version>1.6.4</version> + <type>pom</type> + </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <version>0.9.1</version> + </dependency> + + <!-- Testing dependencies. --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Use the pre-build 'jar-with-dependencies' assembly to package the dependent class files into the final jar. + This creates a jar file that can be deployed to Fluo without having to include any dependent jars. --> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <addClasspath>true</addClasspath> + <classpathLayoutType>custom</classpathLayoutType> + <customClasspathLayout>WEB-INF/lib/$${artifact.groupIdPath}/$${artifact.artifactId}-$${artifact.version}$${dashClassifier?}.$${artifact.extension}</customClasspathLayout> + + <mainClass>org.apache.rya.indexing.pcj.fluo.PcjAdminClient</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>jaxb2-maven-plugin</artifactId> + <version>2.2</version> + <executions> + <execution> + <id>xjc</id> + <goals> + <goal>xjc</goal> + </goals> + </execution> + </executions> + <configuration> + <sources> + <source>src/main/xsd/AccumuloMergeConfiguration.xsd</source> + </sources> + <packageName>org.apache.rya.export</packageName> + <outputDirectory>src/gen/java</outputDirectory> + </configuration> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/.gitignore ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.accumulo/src/.gitignore b/extras/rya.export/export.accumulo/src/.gitignore new file mode 100644 index 0000000..e8e450b --- /dev/null +++ b/extras/rya.export/export.accumulo/src/.gitignore @@ -0,0 +1 @@ +gen/ http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/AccumuloRyaStatementStore.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/AccumuloRyaStatementStore.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/AccumuloRyaStatementStore.java new file mode 100644 index 0000000..90c30ee --- /dev/null +++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/AccumuloRyaStatementStore.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.accumulo; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.log4j.Logger; +import org.apache.rya.export.accumulo.common.InstanceType; +import org.apache.rya.export.accumulo.util.AccumuloInstanceDriver; +import org.apache.rya.export.accumulo.util.AccumuloRyaUtils; +import org.apache.rya.export.api.MergerException; +import org.apache.rya.export.api.store.AddStatementException; +import org.apache.rya.export.api.store.ContainsStatementException; +import org.apache.rya.export.api.store.FetchStatementException; +import org.apache.rya.export.api.store.RemoveStatementException; +import org.apache.rya.export.api.store.RyaStatementStore; +import org.apache.rya.export.api.store.UpdateStatementException; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; + +import info.aduna.iteration.CloseableIteration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +/** + * Allows specific CRUD operations an Accumulo {@link RyaStatement} storage + * system. + * <p> + * The operations specifically: + * <li>fetch all rya statements in the store</li> + * <li>add a rya statement to the store</li> + * <li>remove a rya statement from the store</li> + * <li>update an existing rya statement with a new one</li> + * + * One would use this {@link AccumuloRyaStatementStore} when they have an + * Accumulo database that is used when merging in data or exporting data. + */ +public class AccumuloRyaStatementStore implements RyaStatementStore { + private static final Logger log = Logger.getLogger(AccumuloRyaStatementStore.class); + + private final AccumuloRyaDAO accumuloRyaDao; + private final String tablePrefix; + private final Set<IteratorSetting> iteratorSettings = new HashSet<>(); + private final AccumuloInstanceDriver accumuloInstanceDriver; + + /** + * Creates a new instance of {@link AccumuloRyaStatementStore}. + * @param instanceName the Accumulo instance name. + * @param username the Accumulo user name. + * @param password the Accumulo user's password. + * @param instanceType the {@link InstanceType}. + * @param tablePrefix the Rya instance's table prefix. + * @param auths the comma-separated list of Accumulo authorizations for the + * user. + * @param zooKeepers the comma-separated list of zoo keeper host names. + * @throws MergerException + */ + public AccumuloRyaStatementStore(final String instanceName, final String username, final String password, final InstanceType instanceType, final String tablePrefix, final String auths, final String zooKeepers) throws MergerException { + this.tablePrefix = tablePrefix; + if (tablePrefix != null) { + RdfCloudTripleStoreConstants.prefixTables(tablePrefix); + } + + final String driverName = instanceName + AccumuloRyaStatementStore.class.getSimpleName(); + accumuloInstanceDriver = new AccumuloInstanceDriver(driverName, instanceType, true, false, true, username, password, instanceName, tablePrefix, auths, zooKeepers); + try { + accumuloInstanceDriver.setUp(); + } catch (final Exception e) { + throw new MergerException(e); + } + accumuloRyaDao = accumuloInstanceDriver.getDao(); + } + + @Override + public Iterator<RyaStatement> fetchStatements() throws FetchStatementException { + try { + final RyaTripleContext ryaTripleContext = RyaTripleContext.getInstance(accumuloRyaDao.getConf()); + + Scanner scanner = null; + try { + scanner = AccumuloRyaUtils.getScanner(tablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, accumuloRyaDao.getConf()); + for (final IteratorSetting iteratorSetting : iteratorSettings) { + scanner.addScanIterator(iteratorSetting); + } + } catch (final IOException e) { + throw new FetchStatementException("Unable to get scanner to fetch Rya Statements", e); + } + // Convert Entry iterator to RyaStatement iterator + final Iterator<Entry<Key, Value>> entryIter = scanner.iterator(); + final Iterator<RyaStatement> ryaStatementIter = Iterators.transform(entryIter, new Function<Entry<Key, Value>, RyaStatement>() { + @Override + public RyaStatement apply(final Entry<Key, Value> entry) { + final Key key = entry.getKey(); + final Value value = entry.getValue(); + RyaStatement ryaStatement = null; + try { + ryaStatement = AccumuloRyaUtils.createRyaStatement(key, value, ryaTripleContext); + } catch (final TripleRowResolverException e) { + log.error("Unable to convert the key/value pair into a Rya Statement", e); + } + return ryaStatement; + } + }); + return ryaStatementIter; + } catch (final Exception e) { + throw new FetchStatementException("Failed to fetch statements.", e); + } + } + + @Override + public void addStatement(final RyaStatement statement) throws AddStatementException { + try { + accumuloRyaDao.add(statement); + } catch (final RyaDAOException e) { + throw new AddStatementException("Unable to add the Rya Statement", e); + } + } + + @Override + public void removeStatement(final RyaStatement statement) throws RemoveStatementException { + try { + accumuloRyaDao.delete(statement, accumuloRyaDao.getConf()); + } catch (final RyaDAOException e) { + throw new RemoveStatementException("Unable to delete the Rya Statement", e); + } + } + + @Override + public void updateStatement(final RyaStatement original, final RyaStatement update) throws UpdateStatementException { + try { + removeStatement(original); + addStatement(update); + } catch (final AddStatementException | RemoveStatementException e) { + throw new UpdateStatementException("Unable to update the Rya Statement", e); + } + } + + @Override + public boolean containsStatement(final RyaStatement ryaStatement) throws ContainsStatementException { + try { + final RyaStatement resultRyaStatement = findStatement(ryaStatement); + return resultRyaStatement != null; + } catch (final RyaDAOException e) { + throw new ContainsStatementException("Encountered an error while querying for statement.", e); + } + } + + public RyaStatement findStatement(final RyaStatement ryaStatement) throws RyaDAOException { + RyaStatement resultRyaStatement = null; + CloseableIteration<RyaStatement, RyaDAOException> iter = null; + try { + iter = accumuloRyaDao.getQueryEngine().query(ryaStatement, accumuloRyaDao.getConf()); + if (iter.hasNext()) { + resultRyaStatement = iter.next(); + } + } finally { + if (iter != null) { + iter.close(); + } + } + + return resultRyaStatement; + } + + /** + * @return the {@link AccumuloRyaDAO}. + */ + public AccumuloRyaDAO getRyaDAO() { + return accumuloRyaDao; + } + + /** + * @return the {@link AccumuloInstanceDriver}. + */ + public AccumuloInstanceDriver getAccumuloInstanceDriver() { + return accumuloInstanceDriver; + } + + /** + * Adds an iterator setting to the statement store for it to use when it + * fetches statements. + * @param iteratorSetting the {@link IteratorSetting} to add. + * (not {@code null}) + */ + public void addIterator(final IteratorSetting iteratorSetting) { + checkNotNull(iteratorSetting); + iteratorSettings.add(iteratorSetting); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/common/InstanceType.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/common/InstanceType.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/common/InstanceType.java new file mode 100644 index 0000000..41543d6 --- /dev/null +++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/common/InstanceType.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.accumulo.common; + +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; + +/** + * The type of Accumulo instance. + */ +public enum InstanceType { + /** + * An Accumulo instance that runs using a regular Accumulo distribution. + */ + DISTRIBUTION, + /** + * An Accumulo instance that runs using a {@link MiniAccumuloCluster}. + */ + MINI, + /** + * An Accumulo instance that runs using a {@link MockInstance}. + */ + MOCK; + + /** + * Finds the instance type by name. + * @param name the name to find. + * @return the {@link InstanceType} or {@code null} if none could be found. + */ + public static InstanceType fromName(String name) { + for (InstanceType instanceType : InstanceType.values()) { + if (instanceType.toString().equals(name)) { + return instanceType; + } + } + return null; + } + + /** + * @return {@code true} if the Accumulo instance is a {@link MockInstance}. + * {@code false} otherwise. + */ + public boolean isMock() { + return this == MOCK; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/conf/AccumuloExportConstants.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/conf/AccumuloExportConstants.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/conf/AccumuloExportConstants.java new file mode 100644 index 0000000..ed0aa62 --- /dev/null +++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/conf/AccumuloExportConstants.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.accumulo.conf; + +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.iterators.user.TimestampFilter; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.apache.rya.export.accumulo.common.InstanceType; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import mvm.rya.accumulo.mr.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.indexing.accumulo.ConfigUtils; + +/** + * Constant used for Accumulo merger exports. + */ +public class AccumuloExportConstants { + private static final Logger log = Logger.getLogger(AccumuloExportConstants.class); + + /** + * Appended to certain config property names to indicate that the property is for the child instance. + */ + public static final String CHILD_SUFFIX = ".child"; + + /** + * The {@link InstanceType} to use for Accumulo. + */ + public static final String ACCUMULO_INSTANCE_TYPE_PROP = "ac.instance.type"; + + /** + * A value used for the {@link #START_TIME_PROP} property to indicate that a dialog + * should be displayed to select the time. + */ + public static final String USE_START_TIME_DIALOG = "dialog"; + + public static final SimpleDateFormat START_TIME_FORMATTER = new SimpleDateFormat("yyyyMMddHHmmssSSSz"); + + /** + * Map of keys that are supposed to use the same values. + */ + public static final ImmutableMap<String, List<String>> DUPLICATE_KEY_MAP = ImmutableMap.<String, List<String>>builder() + .put(MRUtils.AC_MOCK_PROP, ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE)) + .put(MRUtils.AC_INSTANCE_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE)) + .put(MRUtils.AC_USERNAME_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_USER)) + .put(MRUtils.AC_PWD_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD)) + .put(MRUtils.AC_AUTH_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS, RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH)) + .put(MRUtils.AC_ZK_PROP, ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS)) + .put(MRUtils.TABLE_PREFIX_PROPERTY, ImmutableList.of(ConfigUtils.CLOUDBASE_TBL_PREFIX, RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX)) + .put(MRUtils.AC_MOCK_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE + CHILD_SUFFIX)) + .put(MRUtils.AC_INSTANCE_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE + CHILD_SUFFIX)) + .put(MRUtils.AC_USERNAME_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_USER + CHILD_SUFFIX)) + .put(MRUtils.AC_PWD_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD + CHILD_SUFFIX)) + .put(MRUtils.AC_AUTH_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS + CHILD_SUFFIX, RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH + CHILD_SUFFIX)) + .put(MRUtils.AC_ZK_PROP + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS + CHILD_SUFFIX)) + .put(MRUtils.TABLE_PREFIX_PROPERTY + CHILD_SUFFIX, ImmutableList.of(ConfigUtils.CLOUDBASE_TBL_PREFIX + CHILD_SUFFIX, RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + CHILD_SUFFIX)) + .build(); + + /** + * Sets duplicate keys in the config. + * @param config the {@link Configuration}. + */ + public static void setDuplicateKeys(final Configuration config) { + for (final Entry<String, List<String>> entry : DUPLICATE_KEY_MAP.entrySet()) { + final String key = entry.getKey(); + final List<String> duplicateKeys = entry.getValue(); + final String value = config.get(key); + if (value != null) { + for (final String duplicateKey : duplicateKeys) { + config.set(duplicateKey, value); + } + } + } + } + + /** + * Sets all duplicate keys for the property in the config to the specified value. + * @param config the {@link Configuration}. + * @param property the property to set and all its duplicates. + * @param value the value to set the property to. + */ + public static void setDuplicateKeysForProperty(final Configuration config, final String property, final String value) { + final List<String> duplicateKeys = DUPLICATE_KEY_MAP.get(property); + config.set(property, value); + if (duplicateKeys != null) { + for (final String key : duplicateKeys) { + config.set(key, value); + } + } + } + + /** + * Creates a formatted string for the start time based on the specified date and whether the dialog is to be displayed. + * @param startDate the start {@link Date} to format. + * @param isStartTimeDialogEnabled {@code true} to display the time dialog instead of using the date. {@code false} + * to use the provided {@code startDate}. + * @return the formatted start time string or {@code "dialog"}. + */ + public static String getStartTimeString(final Date startDate, final boolean isStartTimeDialogEnabled) { + String startTimeString; + if (isStartTimeDialogEnabled) { + startTimeString = USE_START_TIME_DIALOG; // set start date from dialog box + } else { + startTimeString = convertDateToStartTimeString(startDate); + } + return startTimeString; + } + + /** + * Converts the specified date into a string to use as the start time for the timestamp filter. + * @param date the start {@link Date} of the filter that will be formatted as a string. + * @return the formatted start time string. + */ + public static String convertDateToStartTimeString(final Date date) { + final String startTimeString = START_TIME_FORMATTER.format(date); + return startTimeString; + } + + /** + * Converts the specified string into a date to use as the start time for the timestamp filter. + * @param startTimeString the formatted time string. + * @return the start {@link Date}. + */ + public static Date convertStartTimeStringToDate(final String startTimeString) { + Date date; + try { + date = START_TIME_FORMATTER.parse(startTimeString); + } catch (final ParseException e) { + log.error("Could not parse date", e); + return null; + } + return date; + } + + /** + * Creates an {@link IteratorSetting} with a time stamp filter that starts with the specified data. + * @param startTimeString the start time of the filter. + * @return the {@link IteratorSetting}. + */ + public static IteratorSetting getStartTimeSetting(final String startTimeString) { + Date date = null; + try { + date = START_TIME_FORMATTER.parse(startTimeString); + } catch (final ParseException e) { + throw new IllegalArgumentException("Couldn't parse " + startTimeString, e); + } + return getStartTimeSetting(date); + } + + /** + * Creates an {@link IteratorSetting} with a time stamp filter that starts with the specified data. + * @param date the start {@link Date} of the filter. + * @return the {@link IteratorSetting}. + */ + public static IteratorSetting getStartTimeSetting(final Date date) { + return getStartTimeSetting(date.getTime()); + } + + /** + * Creates an {@link IteratorSetting} with a time stamp filter that starts with the specified data. + * @param time the start time of the filter. + * @return the {@link IteratorSetting}. + */ + public static IteratorSetting getStartTimeSetting(final long time) { + final IteratorSetting setting = new IteratorSetting(1, "startTimeIterator", TimestampFilter.class); + TimestampFilter.setStart(setting, time, true); + TimestampFilter.setEnd(setting, Long.MAX_VALUE, true); + return setting; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/parent/AccumuloParentMetadataRepository.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/parent/AccumuloParentMetadataRepository.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/parent/AccumuloParentMetadataRepository.java new file mode 100644 index 0000000..fbf8374 --- /dev/null +++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/parent/AccumuloParentMetadataRepository.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.accumulo.parent; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Date; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.lexicoder.DateLexicoder; +import org.apache.accumulo.core.client.lexicoder.Lexicoder; +import org.apache.accumulo.core.client.lexicoder.LongLexicoder; +import org.apache.accumulo.core.client.lexicoder.StringLexicoder; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.hadoop.io.Text; +import org.apache.log4j.Logger; +import org.apache.rya.export.api.MergerException; +import org.apache.rya.export.api.parent.MergeParentMetadata; +import org.apache.rya.export.api.parent.ParentMetadataDoesNotExistException; +import org.apache.rya.export.api.parent.ParentMetadataExistsException; +import org.apache.rya.export.api.parent.ParentMetadataRepository; + +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.MRUtils; + +/** + * Accumulo repository for metadata pertaining to the parent database. This + * will contain all information to identify where any data was exported from. + * <p> + * The data found here is: + * <li>Parent database Rya Instance Name</li> + * <li>Timestamp used as the lower cutoff for the export</li> + */ +public class AccumuloParentMetadataRepository implements ParentMetadataRepository { + private static final Logger log = Logger.getLogger(AccumuloParentMetadataRepository.class); + + /** + * The Row ID of all {@link MergeParentMetadata} entries that are stored in Accumulo. + */ + private static final Text MERGE_PARENT_METADATA_ROW_ID = new Text("MergeParentMetadata"); + + /** + * The Column Family for all MergeParentMetadata entries. + */ + private static final Text MERGE_PARENT_METADATA_FAMILY = new Text("mergeParentMetadata"); + + /** + * The Column Qualifier for the Rya instance name. + */ + private static final Text MERGE_PARENT_METADATA_RYA_INSTANCE_NAME = new Text("ryaInstanceName"); + + /** + * The Column Qualifier for the copy tool timestamp. + */ + private static final Text MERGE_PARENT_METADATA_TIMESTAMP = new Text("timestamp"); + + /** + * The Column Qualifier for the copy tool filter timestamp. + */ + private static final Text MERGE_PARENT_METADATA_FILTER_TIMESTAMP = new Text("filterTimestamp"); + + /** + * The Column Qualifier for the parent time offset. + */ + private static final Text MERGE_PARENT_METADATA_PARENT_TIME_OFFSET = new Text("parentTimeOffset"); + + // Lexicoders used to read/write MergeParentMetadata to/from Accumulo. + private static final LongLexicoder LONG_LEXICODER = new LongLexicoder(); + private static final StringLexicoder STRING_LEXICODER = new StringLexicoder(); + private static final DateLexicoder DATE_LEXICODER = new DateLexicoder(); + + private static final String BASE_MERGE_METADATA_TABLE_NAME = "merge_metadata"; + + private final AccumuloRyaDAO accumuloRyaDao; + private final Connector connector; + private final String tablePrefix; + private final String mergeParentMetadataTableName; + private MergeParentMetadata metadata = null; + + /** + * Creates a new instance of {@link AccumuloParentMetadataRepository}. + * @param accumuloRyaDao the {@link AccumuloRyaDAO}. (not {@code null}) + */ + public AccumuloParentMetadataRepository(final AccumuloRyaDAO accumuloRyaDao) { + this.accumuloRyaDao = checkNotNull(accumuloRyaDao); + connector = accumuloRyaDao.getConnector(); + tablePrefix = accumuloRyaDao.getConf().getTablePrefix(); + mergeParentMetadataTableName = tablePrefix + BASE_MERGE_METADATA_TABLE_NAME; + } + + @Override + public MergeParentMetadata get() throws ParentMetadataDoesNotExistException { + if (metadata == null) { + metadata = getMetadataFromTable(); + } + return metadata; + } + + private MergeParentMetadata getMetadataFromTable() throws ParentMetadataDoesNotExistException { + try { + // Create an Accumulo scanner that iterates through the metadata entries. + final Scanner scanner = connector.createScanner(mergeParentMetadataTableName, new Authorizations()); + final Iterator<Entry<Key, Value>> entries = scanner.iterator(); + + // No metadata has been stored in the table yet. + if (!entries.hasNext()) { + log.error("Could not find any MergeParentMetadata metadata in the table named: " + mergeParentMetadataTableName); + } + + // Fetch the metadata from the entries. + String ryaInstanceName = null; + Date timestamp = null; + Date filterTimestamp = null; + Long parentTimeOffset = null; + + while (entries.hasNext()) { + final Entry<Key, Value> entry = entries.next(); + final Text columnQualifier = entry.getKey().getColumnQualifier(); + final byte[] value = entry.getValue().get(); + + if (columnQualifier.equals(MERGE_PARENT_METADATA_RYA_INSTANCE_NAME)) { + ryaInstanceName = STRING_LEXICODER.decode(value); + } else if (columnQualifier.equals(MERGE_PARENT_METADATA_TIMESTAMP)) { + timestamp = DATE_LEXICODER.decode(value); + } else if (columnQualifier.equals(MERGE_PARENT_METADATA_FILTER_TIMESTAMP)) { + filterTimestamp = DATE_LEXICODER.decode(value); + } else if (columnQualifier.equals(MERGE_PARENT_METADATA_PARENT_TIME_OFFSET)) { + parentTimeOffset = LONG_LEXICODER.decode(value); + } + } + + return new MergeParentMetadata(ryaInstanceName, timestamp, filterTimestamp, parentTimeOffset); + } catch (final TableNotFoundException e) { + throw new ParentMetadataDoesNotExistException("Could not add results to a MergeParentMetadata because the MergeParentMetadata table does not exist.", e); + } catch (final Exception e) { + throw new ParentMetadataDoesNotExistException("Error occurred while getting merge parent metadata.", e); + } + } + + @Override + public void set(final MergeParentMetadata metadata) throws ParentMetadataExistsException { + try { + createTableIfNeeded(); + writeMetadata(metadata); + } catch (final MergerException e) { + throw new ParentMetadataExistsException("Unable to set MergeParentMetadata", e); + } + } + + private void createTableIfNeeded() throws MergerException { + try { + if (!doesMetadataTableExist()) { + log.debug("Creating table: " + mergeParentMetadataTableName); + connector.tableOperations().create(mergeParentMetadataTableName); + log.debug("Created table: " + mergeParentMetadataTableName); + log.debug("Granting authorizations to table: " + mergeParentMetadataTableName); + final String username = accumuloRyaDao.getConf().get(MRUtils.AC_USERNAME_PROP); + connector.securityOperations().grantTablePermission(username, mergeParentMetadataTableName, TablePermission.WRITE); + log.debug("Granted authorizations to table: " + mergeParentMetadataTableName); + } + } catch (final TableExistsException | AccumuloException | AccumuloSecurityException e) { + throw new MergerException("Could not create a new MergeParentMetadata table named: " + mergeParentMetadataTableName, e); + } + } + + private boolean doesMetadataTableExist() { + return connector.tableOperations().exists(mergeParentMetadataTableName); + } + + /** + * Create the {@link Mutation}s required to write a + * {@link MergerParentMetadata} object to an Accumulo table. + * @param metadata - The {@link MergeParentMetadata} to write. + * (not {@code null}) + * @return An ordered list of mutations that write the metadata to an + * Accumulo table. + */ + private static List<Mutation> makeWriteMetadataMutations(final MergeParentMetadata metadata) { + checkNotNull(metadata); + + final List<Mutation> mutations = new LinkedList<>(); + + // Rya Instance Name + final Mutation ryaInstanceNameMutation = makeFieldMutation(metadata.getRyaInstanceName(), STRING_LEXICODER, MERGE_PARENT_METADATA_RYA_INSTANCE_NAME); + mutations.add(ryaInstanceNameMutation); + + // Timestamp + final Mutation timestampMutation= makeFieldMutation(metadata.getTimestamp(), DATE_LEXICODER, MERGE_PARENT_METADATA_TIMESTAMP); + mutations.add(timestampMutation); + + // Filter Timestamp + if (metadata.getFilterTimestamp() != null) { + final Mutation filterTimestampMutation = makeFieldMutation(metadata.getFilterTimestamp(), DATE_LEXICODER, MERGE_PARENT_METADATA_FILTER_TIMESTAMP); + mutations.add(filterTimestampMutation); + } + + // Parent Time Offset + if (metadata.getParentTimeOffset() != null) { + final Mutation parentTimeOffsetMutation = makeFieldMutation(metadata.getParentTimeOffset(), LONG_LEXICODER, MERGE_PARENT_METADATA_PARENT_TIME_OFFSET); + mutations.add(parentTimeOffsetMutation); + } + + return mutations; + } + + private static <T> Mutation makeFieldMutation(final T object, final Lexicoder<T> lexicoder, final Text columnQualifer) { + final Mutation mutation = new Mutation(MERGE_PARENT_METADATA_ROW_ID); + final Value value = new Value(lexicoder.encode(object)); + mutation.put(MERGE_PARENT_METADATA_FAMILY, columnQualifer, value); + return mutation; + } + + private void writeMetadata(final MergeParentMetadata metadata) throws MergerException { + BatchWriter writer = null; + try{ + // Write each result. + final List<Mutation> mutations = makeWriteMetadataMutations(metadata); + + writer = connector.createBatchWriter(mergeParentMetadataTableName, new BatchWriterConfig()); + writer.addMutations(mutations); + } catch (final AccumuloException | TableNotFoundException e) { + throw new MergerException("Unable to set MergeParentMetadata in Accumulo", e); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (final MutationsRejectedException e) { + throw new MergerException("Could not add results to a MergeParentMetadata table because some of the mutations were rejected.", e); + } + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloInstanceDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloInstanceDriver.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloInstanceDriver.java new file mode 100644 index 0000000..4c086d8 --- /dev/null +++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloInstanceDriver.java @@ -0,0 +1,603 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.accumulo.util; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.SystemUtils; +import org.apache.log4j.Logger; +import org.apache.rya.export.accumulo.common.InstanceType; +import org.apache.rya.export.accumulo.conf.AccumuloExportConstants; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.persist.RyaDAOException; + +/** + * Handles running a single {@link MiniAccumuloCluster} or a single {@link MockInstance} for an instance. + */ +public class AccumuloInstanceDriver { + private static final Logger log = Logger.getLogger(AccumuloInstanceDriver.class); + + private static final boolean IS_COPY_HADOOP_HOME_ENABLED = true; + + public static final String ROOT_USER_NAME = "root"; + + private final String driverName; + private final InstanceType instanceType; + private final boolean isMock; + private final boolean shouldCreateIndices; + private final boolean isReadOnly; + private final boolean isParent; + + private final String user; + private final String password; + private final String instanceName; + private final String tablePrefix; + private final String auth; + + private Connector connector; + + private AccumuloRyaDAO dao; + + private SecurityOperations secOps; + + private final AccumuloRdfConfiguration config = new AccumuloRdfConfiguration(); + + private MiniAccumuloCluster miniAccumuloCluster = null; + + private MockInstance mockInstance = null; + + private ZooKeeperInstance zooKeeperInstance = null; + + private Instance instance = null; + + private String zooKeepers; + + private final Map<String, String> configMap = new LinkedHashMap<>(); + + private List<String> indices = null; + + private final List<String> tableList = new ArrayList<>(); + + private File tempDir = null; + + public static final List<String> TABLE_NAME_SUFFIXES = + ImmutableList.<String>of( + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, + RdfCloudTripleStoreConstants.TBL_STATS_SUFFIX, + RdfCloudTripleStoreConstants.TBL_SEL_SUFFIX + ); + + /** + * Creates a new instance of {@link AccumuloInstanceDriver}. + * @param driverName the name used to identify this driver in the logs. (not {@code null}) + * @param instanceType the {@link InstanceType} of this driver. + * @param shouldCreateIndices {@code true} to create all the indices associated with a Rya deployment. + * {@code false} otherwise. + * @param isReadOnly {@code true} if all the tables in the instance should have their + * table permissions set to read only. {@code false} if the table permission are set to write. + * @param isParent {@code true} if the instance is the parent/main instance. {@code false} if it's the + * child. + * @param user the user name tied to this instance. + * @param password the password for the user. + * @param instanceName the name of the instance. + * @param tablePrefix the table prefix. + * @param auth the comma-separated authorization list. + * @param zooKeepers the comma-separated list of zoo keeper host names. + */ + public AccumuloInstanceDriver(final String driverName, final InstanceType instanceType, final boolean shouldCreateIndices, final boolean isReadOnly, final boolean isParent, final String user, final String password, final String instanceName, final String tablePrefix, final String auth, final String zooKeepers) { + this.driverName = Preconditions.checkNotNull(driverName); + this.instanceType = instanceType; + this.isMock = instanceType.isMock(); + this.shouldCreateIndices = shouldCreateIndices; + this.isReadOnly = isReadOnly; + this.user = user; + this.password = password; + this.instanceName = instanceName; + this.tablePrefix = tablePrefix; + this.auth = auth; + this.zooKeepers = zooKeepers; + this.isParent = isParent; + + config.setTablePrefix(tablePrefix); + } + + /** + * Sets up the {@link AccumuloInstanceDriver}. + * @throws Exception + */ + public void setUp() throws Exception { + setUpInstance(); + setUpTables(); + setUpDao(); + setUpConfig(); + } + + /** + * Sets up the {@link MiniAccumuloCluster} or the {@link MockInstance} or + * distribution instance + * @throws Exception + */ + public void setUpInstance() throws Exception { + switch (instanceType) { + case DISTRIBUTION: + log.info("Setting up " + driverName + " distribution instance..."); + if (instanceName == null) { + throw new IllegalArgumentException("Must specify instance name for distributed mode"); + } else if (zooKeepers == null) { + throw new IllegalArgumentException("Must specify ZooKeeper hosts for distributed mode"); + } + instance = new ZooKeeperInstance(instanceName, zooKeepers); + connector = instance.getConnector(user, new PasswordToken(password)); + log.info("Created connector to " + driverName + " distribution instance"); + break; + case MINI: + log.info("Setting up " + driverName + " MiniAccumulo cluster..."); + // Create and Run MiniAccumulo Cluster + tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + miniAccumuloCluster = new MiniAccumuloCluster(tempDir, password); + copyHadoopHomeToTemp(); + miniAccumuloCluster.getConfig().setInstanceName(instanceName); + log.info(driverName + " MiniAccumulo instance starting up..."); + miniAccumuloCluster.start(); + Thread.sleep(1000); + log.info(driverName + " MiniAccumulo instance started"); + log.info("Creating connector to " + driverName + " MiniAccumulo instance..."); + zooKeeperInstance = new ZooKeeperInstance(miniAccumuloCluster.getClientConfig()); + instance = zooKeeperInstance; + connector = zooKeeperInstance.getConnector(user, new PasswordToken(password)); + log.info("Created connector to " + driverName + " MiniAccumulo instance"); + break; + case MOCK: + log.info("Setting up " + driverName + " mock instance..."); + mockInstance = new MockInstance(instanceName); + instance = mockInstance; + connector = mockInstance.getConnector(user, new PasswordToken(password)); + log.info("Created connector to " + driverName + " mock instance"); + break; + default: + throw new AccumuloException("Unexpected instance type: " + instanceType); + } + zooKeepers = instance.getZooKeepers(); + } + + /** + * Copies the HADOOP_HOME bin directory to the {@link MiniAccumuloCluster} temp directory. + * {@link MiniAccumuloCluster} expects to find bin/winutils.exe in the MAC temp + * directory instead of HADOOP_HOME for some reason. + * @throws IOException + */ + private void copyHadoopHomeToTemp() throws IOException { + if (IS_COPY_HADOOP_HOME_ENABLED && SystemUtils.IS_OS_WINDOWS) { + final String hadoopHomeEnv = System.getenv("HADOOP_HOME"); + if (hadoopHomeEnv != null) { + final File hadoopHomeDir = new File(hadoopHomeEnv); + if (hadoopHomeDir.exists()) { + final File binDir = Paths.get(hadoopHomeDir.getAbsolutePath(), "/bin").toFile(); + if (binDir.exists()) { + FileUtils.copyDirectoryToDirectory(binDir, tempDir); + } else { + log.warn("The specified path for the Hadoop bin directory does not exist: " + binDir.getAbsolutePath()); + } + } else { + log.warn("The specified path for HADOOP_HOME does not exist: " + hadoopHomeDir.getAbsolutePath()); + } + } else { + log.warn("The HADOOP_HOME environment variable was not found."); + } + } + } + + /** + * Sets up all the tables and indices. + * @throws Exception + */ + public void setUpTables() throws Exception { + // Setup tables and permissions + log.info("Setting up " + driverName + " tables and permissions"); + for (final String tableSuffix : TABLE_NAME_SUFFIXES) { + final String tableName = tablePrefix + tableSuffix; + tableList.add(tableName); + if (!connector.tableOperations().exists(tableName)) { + connector.tableOperations().create(tableName); + } + } + + if (shouldCreateIndices) { + indices = Arrays.asList( + /* TODO: SEE RYA-160 + ConfigUtils.getFreeTextDocTablename(config), + ConfigUtils.getFreeTextTermTablename(config), + ConfigUtils.getGeoTablename(config), + ConfigUtils.getTemporalTableName(config), + ConfigUtils.getEntityTableName(config) + */ + ); + + tableList.addAll(indices); + + log.info("Setting up " + driverName + " indices"); + for (final String index : indices) { + if (!connector.tableOperations().exists(index)) { + connector.tableOperations().create(index); + } + } + } + + // Setup user with authorizations + log.info("Creating " + driverName + " user and authorizations"); + secOps = connector.securityOperations(); + if (!user.equals(ROOT_USER_NAME)) { + secOps.createLocalUser(user, new PasswordToken(password)); + } + addAuths(auth); + final TablePermission tablePermission = isReadOnly ? TablePermission.READ : TablePermission.WRITE; + for (final String tableSuffix : TABLE_NAME_SUFFIXES) { + secOps.grantTablePermission(user, tablePrefix + tableSuffix, tablePermission); + } + if (shouldCreateIndices) { + for (final String index : indices) { + secOps.grantTablePermission(user, index, tablePermission); + } + } + } + + /** + * Sets up the {@link AccumuloRyaDAO}. + * @throws Exception + */ + public void setUpDao() throws Exception { + // Setup dao + log.info("Creating " + driverName + " DAO"); + dao = new AccumuloRyaDAO(); + dao.setConnector(connector); + dao.setConf(config); + + // Flush the tables before initializing the DAO + for (final String tableName : tableList) { + connector.tableOperations().flush(tableName, null, null, false); + } + + dao.init(); + } + + /** + * Sets up the configuration and prints the arguments. + */ + public void setUpConfig() { + log.info("Setting " + driverName + " config"); + + // Setup config + if (isMock) { + configMap.put(MRUtils.AC_MOCK_PROP, Boolean.TRUE.toString()); + } + configMap.put(MRUtils.AC_INSTANCE_PROP, instanceName); + configMap.put(MRUtils.AC_USERNAME_PROP, user); + configMap.put(MRUtils.AC_PWD_PROP, password); + configMap.put(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix); + configMap.put(MRUtils.AC_AUTH_PROP, auth); + configMap.put(MRUtils.AC_ZK_PROP, zooKeepers != null ? zooKeepers : "localhost"); + configMap.put(AccumuloExportConstants.ACCUMULO_INSTANCE_TYPE_PROP, instanceType.toString()); + + log.info(driverName + " config properties"); + config.setTablePrefix(tablePrefix); + for (final Entry<String, String> entry : configMap.entrySet()) { + final String key = entry.getKey(); + final String value = entry.getValue(); + final String argument = "-D" + key + "=" + value; + log.info(argument); + config.set(key, value); + } + + AccumuloExportConstants.setDuplicateKeys(config); + } + + /** + * Tears down all the tables and indices. + * @throws Exception + */ + public void tearDownTables() throws Exception { + // delete all tables. + if (connector != null) { + for (final String tableName : tableList) { + if (connector.tableOperations().exists(tableName)) { + connector.tableOperations().delete(tableName); + } + } + } + } + + /** + * Tears down the {@link AccumuloRyaDAO}. + * @throws Exception + */ + public void tearDownDao() throws Exception { + if (dao != null) { + log.info("Stopping " + driverName + " DAO"); + try { + dao.destroy(); + } catch (final RyaDAOException e) { + log.error("Error stopping " + driverName + " DAO", e); + } + dao = null; + } + } + + /** + * Tears down the instance. + * @throws Exception + */ + public void tearDownInstance() throws Exception { + if (miniAccumuloCluster != null) { + log.info("Stopping " + driverName + " cluster"); + try { + miniAccumuloCluster.stop(); + } catch (IOException | InterruptedException e) { + log.error("Error stopping " + driverName + " cluster", e); + } + miniAccumuloCluster = null; + } + } + + /** + * Tears down the {@link AccumuloInstanceDriver}. + * @throws Exception + */ + public void tearDown() throws Exception { + try { + //tearDownTables(); + tearDownDao(); + tearDownInstance(); + } finally { + removeTempDir(); + } + } + + /** + * Deletes the {@link MiniAccumuloCluster} temporary directory. + */ + public void removeTempDir() { + if (tempDir != null) { + try { + FileUtils.deleteDirectory(tempDir); + } catch (final IOException e) { + log.error("Error deleting " + driverName + " temp directory", e); + } + tempDir = null; + } + } + + /** + * Adds authorizations to the {@link SecurityOperations} of this instance's user. + * @param auths the list of authorizations to add. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public void addAuths(final String... auths) throws AccumuloException, AccumuloSecurityException { + final Authorizations newAuths = AccumuloRyaUtils.addUserAuths(user, secOps, auths); + secOps.changeUserAuthorizations(user, newAuths); + } + + /** + * @return the {@link Authorizations} of this instance's user. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public Authorizations getAuths() throws AccumuloException, AccumuloSecurityException { + if (secOps != null) { + return secOps.getUserAuthorizations(user); + } else { + return null; + } + } + + /** + * @return the {@link InstanceType}. + */ + public InstanceType getInstanceType() { + return instanceType; + } + + /** + * @return {@code true} if this is a mock instance. {@code false} if this is a MiniAccumuloCluster instance. + */ + public boolean isMock() { + return isMock; + } + + /** + * @return {@code true} to create all the indices associated with a Rya deployment. + * {@code false} otherwise. + */ + public boolean shouldCreateIndices() { + return shouldCreateIndices; + } + + /** + * @return {@code true} if all the tables in the instance should have their + * table permissions set to read only. {@code false} if the table permission are set to write. + */ + public boolean isReadOnly() { + return isReadOnly; + } + + /** + * @return the user name tied to this instance + */ + public String getUser() { + return user; + } + + /** + * @return the password for the user. + */ + public String getPassword() { + return password; + } + + /** + * @return the name of the instance. + */ + public String getInstanceName() { + return instanceName; + } + + /** + * @return the table prefix. + */ + public String getTablePrefix() { + return tablePrefix; + } + + /** + * @return the comma-separated authorization list. + */ + public String getAuth() { + return auth; + } + + /** + * @return the {@link Connector} to this instance. + */ + public Connector getConnector() { + return connector; + } + + /** + * Sets the {@link Connector} to this instance. + * @param connector the {@link Connector}. + */ + public void setConnector(final Connector connector) { + this.connector = connector; + } + + /** + * @return the {@link AccumuloRyaDAO}. + */ + public AccumuloRyaDAO getDao() { + return dao; + } + + /** + * @return the {@link SecurityOperations}. + */ + public SecurityOperations getSecOps() { + return secOps; + } + + /** + * @return the {@link AccumuloRdfConfiguration}. + */ + public AccumuloRdfConfiguration getConfig() { + return config; + } + + /** + * @return the {@link MiniAccumuloCluster} for this instance or {@code null} if + * this is a {@link MockInstance}. + */ + public MiniAccumuloCluster getMiniAccumuloCluster() { + return miniAccumuloCluster; + } + + /** + * @return the {@link MockInstance} for this instance or {@code null} if + * this is a {@link MiniAccumuloCluster}. + */ + public MockInstance getMockInstance() { + return mockInstance; + } + + /** + * @return the {@link ZooKeeperInstance} for this instance or {@code null} if + * this is a {@link MockInstance}. + */ + public ZooKeeperInstance getZooKeeperInstance() { + return zooKeeperInstance; + } + + /** + * @return the {@link ZooKeepInstance} or {@link MockInstance}. + */ + public Instance getInstance() { + return instance; + } + + /** + * @return the comma-separated list of zoo keeper host names. + */ + public String getZooKeepers() { + return zooKeepers; + } + + /** + * @return an unmodifiable map of the configuration keys and values. + */ + public Map<String, String> getConfigMap() { + return Collections.unmodifiableMap(configMap); + } + + /** + * @return an unmodifiable list of the table names and indices. + */ + public List<String> getTableList() { + return Collections.unmodifiableList(tableList); + } + + /** + * @return the {@link MiniAccumuloCluster} temporary directory for this instance or {@code null} + * if it's a {@link MockInstance}. + */ + public File getTempDir() { + return tempDir; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java ---------------------------------------------------------------------- diff --git a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java new file mode 100644 index 0000000..c425227 --- /dev/null +++ b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java @@ -0,0 +1,447 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.rya.export.accumulo.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.ClientConfiguration; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.user.RegExFilter; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.openrdf.model.ValueFactory; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.indexing.accumulo.ConfigUtils; + +/** + * Utility methods for an Accumulo Rya instance. + */ +public final class AccumuloRyaUtils { + private static final Logger log = Logger.getLogger(AccumuloRyaUtils.class); + + private static final String NAMESPACE = RdfCloudTripleStoreConstants.NAMESPACE; + private static final ValueFactory VALUE_FACTORY = RdfCloudTripleStoreConstants.VALUE_FACTORY; + + /** + * Ignore the meta statements indicating the Rya version and copy time values. + */ + public static final ImmutableSet<IteratorSetting> COMMON_REG_EX_FILTER_SETTINGS = ImmutableSet.of( + getVersionRegExFilterSetting() + ); + + /** + * Private constructor to prevent instantiation. + */ + private AccumuloRyaUtils() { + } + + /** + * Creates a {@link RyaURI} for the specified local name. + * @param localName the URI's local name. + * @return the {@link RyaURI}. + */ + public static RyaURI createRyaUri(final String localName) { + return createRyaUri(NAMESPACE, localName); + } + + /** + * Creates a {@link RyaURI} for the specified local name. + * @param namespace the namespace. + * @param localName the URI's local name. + * @return the {@link RyaURI}. + */ + public static RyaURI createRyaUri(final String namespace, final String localName) { + return RdfToRyaConversions.convertURI(VALUE_FACTORY.createURI(namespace, localName)); + } + + /** + * Converts a {@link RyaURI} to the contained data string. + * @param the {@link RyaURI} to convert. + * @return the data value without the namespace. + */ + public static String convertRyaUriToString(final RyaURI ryaUri) { + return convertRyaUriToString(NAMESPACE, ryaUri); + } + + /** + * Converts a {@link RyaURI} to the contained data string. + * @param namespace the namespace. + * @param the {@link RyaURI} to convert. + * @return the data value without the namespace. + */ + public static String convertRyaUriToString(final String namespace, final RyaURI ryaUri) { + return StringUtils.replaceOnce(ryaUri.getData(), namespace, ""); + } + + /** + * Creates a {@link RyaStatement} from a {@link Key}/{@link Value} pair. + * @param key the {@link Key}. + * @param value the {@link Value}. + * @param ryaTripleContext the {@link RyaTripleContext}. + * @return the converted {@link RyaStatement}. + * @throws TripleRowResolverException + */ + public static RyaStatement createRyaStatement(final Key key, final Value value, final RyaTripleContext ryaTripleContext) throws TripleRowResolverException { + final byte[] row = key.getRowData() != null && key.getRowData().toArray().length > 0 ? key.getRowData().toArray() : null; + final byte[] columnFamily = key.getColumnFamilyData() != null && key.getColumnFamilyData().toArray().length > 0 ? key.getColumnFamilyData().toArray() : null; + final byte[] columnQualifier = key.getColumnQualifierData() != null && key.getColumnQualifierData().toArray().length > 0 ? key.getColumnQualifierData().toArray() : null; + final Long timestamp = key.getTimestamp(); + final byte[] columnVisibility = key.getColumnVisibilityData() != null && key.getColumnVisibilityData().toArray().length > 0 ? key.getColumnVisibilityData().toArray() : null; + final byte[] valueBytes = value != null && value.get().length > 0 ? value.get() : null; + final TripleRow tripleRow = new TripleRow(row, columnFamily, columnQualifier, timestamp, columnVisibility, valueBytes); + final RyaStatement ryaStatement = ryaTripleContext.deserializeTriple(TABLE_LAYOUT.SPO, tripleRow); + + return ryaStatement; + } + + /** + * Creates a {@link RegExFilter} setting to ignore the version row in a table. + * @return the {@link RegExFilter} {@link IteratorSetting}. + */ + public static IteratorSetting getVersionRegExFilterSetting() { + final IteratorSetting regex = new IteratorSetting(30, "version_regex", RegExFilter.class); + RegExFilter.setRegexs(regex, "(.*)urn:(.*)#version[\u0000|\u0001](.*)", null, null, null, false); + RegExFilter.setNegate(regex, true); + return regex; + } + + /** + * Adds all the common regex filter {@link IteratorSetting}s to the provided {@link Scanner} so + * certain metadata keys in a table are ignored. + * @param scanner the {@link Scanner} to add the regex filter {@link IteratorSetting}s to. + */ + public static void addCommonScannerIteratorsTo(final Scanner scanner) { + for (final IteratorSetting iteratorSetting : COMMON_REG_EX_FILTER_SETTINGS) { + scanner.addScanIterator(iteratorSetting); + } + } + + /** + * Creates a {@link Scanner} of the provided table name using the specified {@link Configuration}. + * This applies common iterator settings to the table scanner that ignore internal metadata keys. + * @param tablename the name of the table to scan. + * @param config the {@link Configuration}. + * @return the {@link Scanner} for the table. + * @throws IOException + */ + public static Scanner getScanner(final String tableName, final Configuration config) throws IOException { + return getScanner(tableName, config, true); + } + + /** + * Creates a {@link Scanner} of the provided table name using the specified {@link Configuration}. + * @param tablename the name of the table to scan. + * @param config the {@link Configuration}. + * @param shouldAddCommonIterators {@code true} to add the common iterators to the table scanner. + * {@code false} otherwise. + * @return the {@link Scanner} for the table. + * @throws IOException + */ + public static Scanner getScanner(final String tableName, final Configuration config, final boolean shouldAddCommonIterators) throws IOException { + try { + final String instanceName = config.get(ConfigUtils.CLOUDBASE_INSTANCE); + final String zooKeepers = config.get(ConfigUtils.CLOUDBASE_ZOOKEEPERS); + Instance instance; + if (ConfigUtils.useMockInstance(config)) { + instance = new MockInstance(instanceName); + } else { + instance = new ZooKeeperInstance(new ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers)); + } + final String username = ConfigUtils.getUsername(config); + final String password = ConfigUtils.getPassword(config); + final Connector connector = instance.getConnector(username, new PasswordToken(password)); + final Authorizations auths = ConfigUtils.getAuthorizations(config); + + final Scanner scanner = connector.createScanner(tableName, auths); + if (shouldAddCommonIterators) { + AccumuloRyaUtils.addCommonScannerIteratorsTo(scanner); + } + return scanner; + } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) { + log.error("Error connecting to " + tableName); + throw new IOException(e); + } + } + + /** + * Prints the table with the specified config and additional settings. + * This applies common iterator settings to the table scanner that ignore internal metadata keys. + * @param tableName the name of the table to print. + * @param config the {@link AccumuloRdfConfiguration}. + * @param settings the additional {@link IteratorSetting}s to add besides the common ones. + * @throws IOException + */ + public static void printTable(final String tableName, final AccumuloRdfConfiguration config, final IteratorSetting... settings) throws IOException { + printTable(tableName, config, true, settings); + } + + /** + * Prints the table with the specified config and additional settings. + * @param tableName the name of the table to print. + * @param config the {@link AccumuloRdfConfiguration}. + * @param shouldAddCommonIterators {@code true} to add the common iterators to the table scanner. + * {@code false} otherwise. + * @param settings the additional {@link IteratorSetting}s to add besides the common ones. + * @throws IOException + */ + public static void printTable(final String tableName, final AccumuloRdfConfiguration config, final boolean shouldAddCommonIterators, final IteratorSetting... settings) throws IOException { + final Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config, shouldAddCommonIterators); + for (final IteratorSetting setting : settings) { + scanner.addScanIterator(setting); + } + + final Iterator<Entry<Key, Value>> iterator = scanner.iterator(); + + final String instance = config.get(MRUtils.AC_INSTANCE_PROP); + log.info("=================="); + log.info("TABLE: " + tableName + " INSTANCE: " + instance); + log.info("------------------"); + while (iterator.hasNext()) { + final Entry<Key, Value> entry = iterator.next(); + final Key key = entry.getKey(); + final Value value = entry.getValue(); + final String keyString = getFormattedKeyString(key); + log.info(keyString + " - " + value); + } + log.info("=================="); + } + + private static String getFormattedKeyString(final Key key) { + final StringBuilder sb = new StringBuilder(); + final byte[] row = key.getRow().getBytes(); + final byte[] colFamily = key.getColumnFamily().getBytes(); + final byte[] colQualifier = key.getColumnQualifier().getBytes(); + final byte[] colVisibility = key.getColumnVisibility().getBytes(); + final int maxRowDataToPrint = 256; + Key.appendPrintableString(row, 0, row.length, maxRowDataToPrint, sb); + sb.append(" "); + Key.appendPrintableString(colFamily, 0, colFamily.length, maxRowDataToPrint, sb); + sb.append(":"); + Key.appendPrintableString(colQualifier, 0, colQualifier.length, maxRowDataToPrint, sb); + sb.append(" ["); + Key.appendPrintableString(colVisibility, 0, colVisibility.length, maxRowDataToPrint, sb); + sb.append("]"); + sb.append(" "); + sb.append(new Date(key.getTimestamp())); + //sb.append(Long.toString(key.getTimestamp())); + //sb.append(" "); + //sb.append(key.isDeleted()); + return sb.toString(); + } + + /** + * Prints the table with pretty formatting using the specified config and additional settings. + * This applies common iterator settings to the table scanner that ignore internal metadata keys. + * @param tableName the name of the table to print. + * @param config the {@link AccumuloRdfConfiguration}. + * @param settings the additional {@link IteratorSetting}s to add besides the common ones. + * @throws IOException + */ + public static void printTablePretty(final String tableName, final Configuration config, final IteratorSetting... settings) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException { + printTablePretty(tableName, config, true, settings); + } + + /** + * Prints the table with pretty formatting using the specified config and additional settings. + * @param tableName the name of the table to print. + * @param config the {@link AccumuloRdfConfiguration}. + * @param shouldAddCommonIterators {@code true} to add the common iterators to the table scanner. + * {@code false} otherwise. + * @param settings the additional {@link IteratorSetting}s to add besides the common ones. + * @throws IOException + */ + public static void printTablePretty(final String tableName, final Configuration config, final boolean shouldAddCommonIterators, final IteratorSetting... settings) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, IOException { + final Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config, shouldAddCommonIterators); + for (final IteratorSetting setting : settings) { + scanner.addScanIterator(setting); + } + + final String format = "| %-64s | %-24s | %-28s | %-20s | %-20s | %-10s |"; + final int totalFormatLength = String.format(format, 1, 2, 3, 4, 5, 6).length(); + final String instance = config.get(MRUtils.AC_INSTANCE_PROP); + log.info(StringUtils.rightPad("==================", totalFormatLength, "=")); + log.info(StringUtils.rightPad("| TABLE: " + tableName + " INSTANCE: " + instance, totalFormatLength - 1) + "|"); + log.info(StringUtils.rightPad("------------------", totalFormatLength, "-")); + log.info(String.format(format, "--Row--", "--ColumnVisibility--", "--Timestamp--", "--ColumnFamily--", "--ColumnQualifier--", "--Value--")); + log.info(StringUtils.rightPad("|-----------------", totalFormatLength - 1, "-") + "|"); + for (final Entry<Key, Value> entry : scanner) { + final Key k = entry.getKey(); + final String rowString = Key.appendPrintableString(k.getRow().getBytes(), 0, k.getRow().getLength(), Constants.MAX_DATA_TO_PRINT, new StringBuilder()).toString(); + log.info(String.format(format, rowString, k.getColumnVisibility(), new Date(k.getTimestamp()), k.getColumnFamily(), k.getColumnQualifier(), entry.getValue())); + } + log.info(StringUtils.rightPad("==================", totalFormatLength, "=")); + } + + /** + * Adds authorizations to a user's authorizations list. + * @param user the name of the user to add authorizations for. + * @param secOps the {@link SecurityOperations}. + * @param auths the {@link Authorizations} to add + * @return the {@link Authorizations}. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public static Authorizations addUserAuths(final String user, final SecurityOperations secOps, final Authorizations auths) throws AccumuloException, AccumuloSecurityException { + final List<String> authList = new ArrayList<>(); + for (final byte[] authBytes : auths.getAuthorizations()) { + final String auth = new String(authBytes); + authList.add(auth); + } + return addUserAuths(user, secOps, authList.toArray(new String[0])); + } + + /** + * Adds authorizations to a user's authorizations list. + * @param user the name of the user to add authorizations for. + * @param secOps the {@link SecurityOperations}. + * @param auths the list of authorizations to add + * @return the {@link Authorizations}. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public static Authorizations addUserAuths(final String user, final SecurityOperations secOps, final String... auths) throws AccumuloException, AccumuloSecurityException { + final Authorizations currentUserAuths = secOps.getUserAuthorizations(user); + final List<byte[]> authList = new ArrayList<>(); + for (final byte[] currentAuth : currentUserAuths.getAuthorizations()) { + authList.add(currentAuth); + } + for (final String newAuth : auths) { + authList.add(newAuth.getBytes()); + } + final Authorizations result = new Authorizations(authList); + return result; + } + + /** + * Removes the specified authorizations from the user. + * @param userName the name of the user to change authorizations for. + * @param secOps the {@link SecurityOperations} to change. + * @param authsToRemove the comma-separated string of authorizations to remove. + * @throws AccumuloSecurityException + * @throws AccumuloException + */ + public static void removeUserAuths(final String userName, final SecurityOperations secOps, final String authsToRemove) throws AccumuloException, AccumuloSecurityException { + final Authorizations currentUserAuths = secOps.getUserAuthorizations(userName); + final List<String> authList = convertAuthStringToList(currentUserAuths.toString()); + + final List<String> authsToRemoveList = convertAuthStringToList(authsToRemove); + authList.removeAll(authsToRemoveList); + + final String authString = Joiner.on(",").join(authList); + final Authorizations newAuths = new Authorizations(authString); + + secOps.changeUserAuthorizations(userName, newAuths); + } + + /** + * Convert the comma-separated string of authorizations into a list of authorizations. + * @param authString the comma-separated string of authorizations. + * @return a {@link List} of authorization strings. + */ + public static List<String> convertAuthStringToList(final String authString) { + final List<String> authList = new ArrayList<>(); + if (authString != null) { + final String[] authSplit = authString.split(","); + authList.addAll(Arrays.asList(authSplit)); + } + return authList; + } + + /** + * Sets up a {@link Connector} with the specified config. + * @param accumuloRdfConfiguration the {@link AccumuloRdfConfiguration}. + * @return the {@link Connector}. + */ + public static Connector setupConnector(final AccumuloRdfConfiguration accumuloRdfConfiguration) { + Connector connector = null; + try { + connector = ConfigUtils.getConnector(accumuloRdfConfiguration); + } catch (AccumuloException | AccumuloSecurityException e) { + log.error("Error creating connector", e); + } + + return connector; + } + + /** + * Sets up a {@link AccumuloRyaDAO} with the specified connector. + * @param connector the {@link Connector}. + * @return the {@link AccumuloRyaDAO}. + */ + public static AccumuloRyaDAO setupDao(final AccumuloRdfConfiguration accumuloRdfConfiguration) { + final Connector connector = setupConnector(accumuloRdfConfiguration); + return setupDao(connector, accumuloRdfConfiguration); + } + + /** + * Sets up a {@link AccumuloRyaDAO} with the specified connector and config. + * @param connector the {@link Connector}. + * @param accumuloRdfConfiguration the {@link AccumuloRdfConfiguration}. + * @return the {@link AccumuloRyaDAO}. + */ + public static AccumuloRyaDAO setupDao(final Connector connector, final AccumuloRdfConfiguration accumuloRdfConfiguration) { + final AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO(); + accumuloRyaDao.setConnector(connector); + accumuloRyaDao.setConf(accumuloRdfConfiguration); + + try { + accumuloRyaDao.init(); + } catch (final RyaDAOException e) { + log.error("Error initializing DAO", e); + } + + return accumuloRyaDao; + } +} \ No newline at end of file