RYA-126 Accumulo side implementation of export.api. (#15)

Project: http://git-wip-us.apache.org/repos/asf/incubator-rya/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rya/commit/f2b046f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rya/tree/f2b046f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rya/diff/f2b046f3

Branch: refs/heads/master
Commit: f2b046f36765227571a4ba5155e9fd368a8cb16a
Parents: 2c37b24
Author: ejwhite922 <eric.wh...@sparta.com>
Authored: Thu Sep 8 13:14:10 2016 -0400
Committer: isper3at <smith...@gmail.com>
Committed: Wed Nov 2 17:51:18 2016 -0400

----------------------------------------------------------------------
 extras/rya.export/export.accumulo/pom.xml       | 160 +++++
 .../rya.export/export.accumulo/src/.gitignore   |   1 +
 .../accumulo/AccumuloRyaStatementStore.java     | 220 +++++++
 .../export/accumulo/common/InstanceType.java    |  62 ++
 .../accumulo/conf/AccumuloExportConstants.java  | 195 ++++++
 .../AccumuloParentMetadataRepository.java       | 265 ++++++++
 .../accumulo/util/AccumuloInstanceDriver.java   | 603 ++++++++++++++++++
 .../export/accumulo/util/AccumuloRyaUtils.java  | 447 +++++++++++++
 .../api/conf/AccumuloConfigurationAdapter.java  |  67 ++
 .../api/conf/AccumuloMergeConfiguration.java    | 182 ++++++
 .../src/main/xsd/AccumuloMergeConfiguration.xsd |  46 ++
 .../accumulo/AccumuloRyaStatementStoreTest.java | 384 ++++++++++++
 .../apache/rya/export/accumulo/TestUtils.java   |  80 +++
 .../driver/AccumuloDualInstanceDriver.java      | 621 +++++++++++++++++++
 ...muloParentMetadataRepositoryAdapterTest.java |  67 ++
 .../conf/AccumuloConfigurationAdapterTest.java  | 145 +++++
 extras/rya.export/pom.xml                       |   1 +
 17 files changed, 3546 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/pom.xml
----------------------------------------------------------------------
diff --git a/extras/rya.export/export.accumulo/pom.xml 
b/extras/rya.export/export.accumulo/pom.xml
new file mode 100644
index 0000000..23598b0
--- /dev/null
+++ b/extras/rya.export/export.accumulo/pom.xml
@@ -0,0 +1,160 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project 
+    xmlns="http://maven.apache.org/POM/4.0.0"; 
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <groupId>org.apache.rya</groupId>
+        <artifactId>rya.export.parent</artifactId>
+        <version>3.2.10-SNAPSHOT</version>
+    </parent>
+    
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>rya.export.accumulo</artifactId>
+    
+    <name>Apache Rya Export Accumulo</name>
+    <description>Contains the accumulo implementation of the export 
tool.</description>
+    
+    <dependencies>
+        <!-- Rya Runtime Dependencies. -->
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.export.api</artifactId>
+            <version>3.2.10-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.indexing</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rya</groupId>
+            <artifactId>rya.mapreduce</artifactId>
+        </dependency>
+
+        <!-- Log4j 2 bridge, api, and core. -->        
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-1.2-api</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>2.5</version>
+        </dependency> 
+        
+        <!--  Accumulo -->
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-core</artifactId>
+            <version>1.6.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo-start</artifactId>
+            <version>1.6.4</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.accumulo</groupId>
+            <artifactId>accumulo</artifactId>
+            <version>1.6.4</version>
+            <type>pom</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>0.9.1</version>
+        </dependency>
+
+        <!-- Testing dependencies. -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    
+    <build>
+        <plugins>
+            <!-- Use the pre-build 'jar-with-dependencies' assembly to package 
the dependent class files into the final jar. 
+                 This creates a jar file that can be deployed to Fluo without 
having to include any dependent jars. -->
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <addClasspath>true</addClasspath>
+                            <classpathLayoutType>custom</classpathLayoutType>
+                            
<customClasspathLayout>WEB-INF/lib/$${artifact.groupIdPath}/$${artifact.artifactId}-$${artifact.version}$${dashClassifier?}.$${artifact.extension}</customClasspathLayout>
+                        
+                            
<mainClass>org.apache.rya.indexing.pcj.fluo.PcjAdminClient</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>jaxb2-maven-plugin</artifactId>
+                <version>2.2</version>
+                <executions>
+                    <execution>
+                        <id>xjc</id>
+                        <goals>
+                            <goal>xjc</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <sources>
+                        
<source>src/main/xsd/AccumuloMergeConfiguration.xsd</source>
+                    </sources>
+                    <packageName>org.apache.rya.export</packageName>
+                    <outputDirectory>src/gen/java</outputDirectory>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/.gitignore
----------------------------------------------------------------------
diff --git a/extras/rya.export/export.accumulo/src/.gitignore 
b/extras/rya.export/export.accumulo/src/.gitignore
new file mode 100644
index 0000000..e8e450b
--- /dev/null
+++ b/extras/rya.export/export.accumulo/src/.gitignore
@@ -0,0 +1 @@
+gen/

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/AccumuloRyaStatementStore.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/AccumuloRyaStatementStore.java
 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/AccumuloRyaStatementStore.java
new file mode 100644
index 0000000..90c30ee
--- /dev/null
+++ 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/AccumuloRyaStatementStore.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.export.accumulo;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.log4j.Logger;
+import org.apache.rya.export.accumulo.common.InstanceType;
+import org.apache.rya.export.accumulo.util.AccumuloInstanceDriver;
+import org.apache.rya.export.accumulo.util.AccumuloRyaUtils;
+import org.apache.rya.export.api.MergerException;
+import org.apache.rya.export.api.store.AddStatementException;
+import org.apache.rya.export.api.store.ContainsStatementException;
+import org.apache.rya.export.api.store.FetchStatementException;
+import org.apache.rya.export.api.store.RemoveStatementException;
+import org.apache.rya.export.api.store.RyaStatementStore;
+import org.apache.rya.export.api.store.UpdateStatementException;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+import info.aduna.iteration.CloseableIteration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+
+/**
+ * Allows specific CRUD operations an Accumulo {@link RyaStatement} storage
+ * system.
+ * <p>
+ * The operations specifically:
+ * <li>fetch all rya statements in the store</li>
+ * <li>add a rya statement to the store</li>
+ * <li>remove a rya statement from the store</li>
+ * <li>update an existing rya statement with a new one</li>
+ *
+ * One would use this {@link AccumuloRyaStatementStore} when they have an
+ * Accumulo database that is used when merging in data or exporting data.
+ */
+public class AccumuloRyaStatementStore implements RyaStatementStore {
+    private static final Logger log = 
Logger.getLogger(AccumuloRyaStatementStore.class);
+
+    private final AccumuloRyaDAO accumuloRyaDao;
+    private final String tablePrefix;
+    private final Set<IteratorSetting> iteratorSettings = new HashSet<>();
+    private final AccumuloInstanceDriver accumuloInstanceDriver;
+
+    /**
+     * Creates a new instance of {@link AccumuloRyaStatementStore}.
+     * @param instanceName the Accumulo instance name.
+     * @param username the Accumulo user name.
+     * @param password the Accumulo user's password.
+     * @param instanceType the {@link InstanceType}.
+     * @param tablePrefix the Rya instance's table prefix.
+     * @param auths the comma-separated list of Accumulo authorizations for the
+     * user.
+     * @param zooKeepers the comma-separated list of zoo keeper host names.
+     * @throws MergerException
+     */
+    public AccumuloRyaStatementStore(final String instanceName, final String 
username, final String password, final InstanceType instanceType, final String 
tablePrefix, final String auths, final String zooKeepers) throws 
MergerException {
+        this.tablePrefix = tablePrefix;
+        if (tablePrefix != null) {
+            RdfCloudTripleStoreConstants.prefixTables(tablePrefix);
+        }
+
+        final String driverName = instanceName + 
AccumuloRyaStatementStore.class.getSimpleName();
+        accumuloInstanceDriver = new AccumuloInstanceDriver(driverName, 
instanceType, true, false, true, username, password, instanceName, tablePrefix, 
auths, zooKeepers);
+        try {
+            accumuloInstanceDriver.setUp();
+        } catch (final Exception e) {
+            throw new MergerException(e);
+        }
+        accumuloRyaDao = accumuloInstanceDriver.getDao();
+    }
+
+    @Override
+    public Iterator<RyaStatement> fetchStatements() throws 
FetchStatementException {
+        try {
+            final RyaTripleContext ryaTripleContext = 
RyaTripleContext.getInstance(accumuloRyaDao.getConf());
+
+            Scanner scanner = null;
+            try {
+                scanner = AccumuloRyaUtils.getScanner(tablePrefix + 
RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, accumuloRyaDao.getConf());
+                for (final IteratorSetting iteratorSetting : iteratorSettings) 
{
+                    scanner.addScanIterator(iteratorSetting);
+                }
+            } catch (final IOException e) {
+                throw new FetchStatementException("Unable to get scanner to 
fetch Rya Statements", e);
+            }
+            // Convert Entry iterator to RyaStatement iterator
+            final Iterator<Entry<Key, Value>> entryIter = scanner.iterator();
+            final Iterator<RyaStatement> ryaStatementIter = 
Iterators.transform(entryIter, new Function<Entry<Key, Value>, RyaStatement>() {
+               @Override
+               public RyaStatement apply(final Entry<Key, Value> entry) {
+                   final Key key = entry.getKey();
+                   final Value value = entry.getValue();
+                   RyaStatement ryaStatement = null;
+                   try {
+                       ryaStatement = AccumuloRyaUtils.createRyaStatement(key, 
value, ryaTripleContext);
+                   } catch (final TripleRowResolverException e) {
+                       log.error("Unable to convert the key/value pair into a 
Rya Statement", e);
+                   }
+                   return ryaStatement;
+               }
+            });
+            return ryaStatementIter;
+        } catch (final Exception e) {
+            throw new FetchStatementException("Failed to fetch statements.", 
e);
+        }
+    }
+
+    @Override
+    public void addStatement(final RyaStatement statement) throws 
AddStatementException {
+        try {
+            accumuloRyaDao.add(statement);
+        } catch (final RyaDAOException e) {
+            throw new AddStatementException("Unable to add the Rya Statement", 
e);
+        }
+    }
+
+    @Override
+    public void removeStatement(final RyaStatement statement) throws 
RemoveStatementException {
+        try {
+            accumuloRyaDao.delete(statement, accumuloRyaDao.getConf());
+        } catch (final RyaDAOException e) {
+            throw new RemoveStatementException("Unable to delete the Rya 
Statement", e);
+        }
+    }
+
+    @Override
+    public void updateStatement(final RyaStatement original, final 
RyaStatement update) throws UpdateStatementException {
+        try {
+            removeStatement(original);
+            addStatement(update);
+        } catch (final AddStatementException | RemoveStatementException e) {
+            throw new UpdateStatementException("Unable to update the Rya 
Statement", e);
+        }
+    }
+
+    @Override
+    public boolean containsStatement(final RyaStatement ryaStatement) throws 
ContainsStatementException {
+        try {
+            final RyaStatement resultRyaStatement = 
findStatement(ryaStatement);
+            return resultRyaStatement != null;
+        } catch (final RyaDAOException e) {
+            throw new ContainsStatementException("Encountered an error while 
querying for statement.", e);
+        }
+    }
+
+    public RyaStatement findStatement(final RyaStatement ryaStatement) throws 
RyaDAOException {
+        RyaStatement resultRyaStatement = null;
+        CloseableIteration<RyaStatement, RyaDAOException> iter = null;
+        try {
+            iter = accumuloRyaDao.getQueryEngine().query(ryaStatement, 
accumuloRyaDao.getConf());
+            if (iter.hasNext()) {
+                resultRyaStatement = iter.next();
+            }
+        } finally {
+            if (iter != null) {
+                iter.close();
+            }
+        }
+
+        return resultRyaStatement;
+    }
+
+    /**
+     * @return the {@link AccumuloRyaDAO}.
+     */
+    public AccumuloRyaDAO getRyaDAO() {
+        return accumuloRyaDao;
+    }
+
+    /**
+     * @return the {@link AccumuloInstanceDriver}.
+     */
+    public AccumuloInstanceDriver getAccumuloInstanceDriver() {
+        return accumuloInstanceDriver;
+    }
+
+    /**
+     * Adds an iterator setting to the statement store for it to use when it
+     * fetches statements.
+     * @param iteratorSetting the {@link IteratorSetting} to add.
+     * (not {@code null})
+     */
+    public void addIterator(final IteratorSetting iteratorSetting) {
+        checkNotNull(iteratorSetting);
+        iteratorSettings.add(iteratorSetting);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/common/InstanceType.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/common/InstanceType.java
 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/common/InstanceType.java
new file mode 100644
index 0000000..41543d6
--- /dev/null
+++ 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/common/InstanceType.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.export.accumulo.common;
+
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+
+/**
+ * The type of Accumulo instance.
+ */
+public enum InstanceType {
+    /**
+     * An Accumulo instance that runs using a regular Accumulo distribution.
+     */
+    DISTRIBUTION,
+    /**
+     * An Accumulo instance that runs using a {@link MiniAccumuloCluster}.
+     */
+    MINI,
+    /**
+     * An Accumulo instance that runs using a {@link MockInstance}.
+     */
+    MOCK;
+
+    /**
+     * Finds the instance type by name.
+     * @param name the name to find.
+     * @return the {@link InstanceType} or {@code null} if none could be found.
+     */
+    public static InstanceType fromName(String name) {
+        for (InstanceType instanceType : InstanceType.values()) {
+            if (instanceType.toString().equals(name)) {
+                return instanceType;
+            }
+        }
+        return null;
+    }
+
+    /**
+     * @return {@code true} if the Accumulo instance is a {@link MockInstance}.
+     * {@code false} otherwise.
+     */
+    public boolean isMock() {
+        return this == MOCK;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/conf/AccumuloExportConstants.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/conf/AccumuloExportConstants.java
 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/conf/AccumuloExportConstants.java
new file mode 100644
index 0000000..ed0aa62
--- /dev/null
+++ 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/conf/AccumuloExportConstants.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.export.accumulo.conf;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.iterators.user.TimestampFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.apache.rya.export.accumulo.common.InstanceType;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import mvm.rya.accumulo.mr.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConfiguration;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+/**
+ * Constant used for Accumulo merger exports.
+ */
+public class AccumuloExportConstants {
+    private static final Logger log = 
Logger.getLogger(AccumuloExportConstants.class);
+
+    /**
+     * Appended to certain config property names to indicate that the property 
is for the child instance.
+     */
+    public static final String CHILD_SUFFIX = ".child";
+
+    /**
+     * The {@link InstanceType} to use for Accumulo.
+     */
+    public static final String ACCUMULO_INSTANCE_TYPE_PROP = 
"ac.instance.type";
+
+    /**
+     * A value used for the {@link #START_TIME_PROP} property to indicate that 
a dialog
+     * should be displayed to select the time.
+     */
+    public static final String USE_START_TIME_DIALOG = "dialog";
+
+    public static final SimpleDateFormat START_TIME_FORMATTER = new 
SimpleDateFormat("yyyyMMddHHmmssSSSz");
+
+    /**
+     * Map of keys that are supposed to use the same values.
+     */
+    public static final ImmutableMap<String, List<String>> DUPLICATE_KEY_MAP = 
ImmutableMap.<String, List<String>>builder()
+        .put(MRUtils.AC_MOCK_PROP, 
ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE))
+        .put(MRUtils.AC_INSTANCE_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE))
+        .put(MRUtils.AC_USERNAME_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_USER))
+        .put(MRUtils.AC_PWD_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD))
+        .put(MRUtils.AC_AUTH_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS, 
RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH))
+        .put(MRUtils.AC_ZK_PROP, 
ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS))
+        .put(MRUtils.TABLE_PREFIX_PROPERTY, 
ImmutableList.of(ConfigUtils.CLOUDBASE_TBL_PREFIX, 
RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX))
+        .put(MRUtils.AC_MOCK_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.USE_MOCK_INSTANCE + CHILD_SUFFIX))
+        .put(MRUtils.AC_INSTANCE_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_INSTANCE + CHILD_SUFFIX))
+        .put(MRUtils.AC_USERNAME_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_USER + CHILD_SUFFIX))
+        .put(MRUtils.AC_PWD_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_PASSWORD + CHILD_SUFFIX))
+        .put(MRUtils.AC_AUTH_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_AUTHS + CHILD_SUFFIX, 
RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH + CHILD_SUFFIX))
+        .put(MRUtils.AC_ZK_PROP + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_ZOOKEEPERS + CHILD_SUFFIX))
+        .put(MRUtils.TABLE_PREFIX_PROPERTY + CHILD_SUFFIX, 
ImmutableList.of(ConfigUtils.CLOUDBASE_TBL_PREFIX + CHILD_SUFFIX, 
RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX + CHILD_SUFFIX))
+        .build();
+
+    /**
+     * Sets duplicate keys in the config.
+     * @param config the {@link Configuration}.
+     */
+    public static void setDuplicateKeys(final Configuration config) {
+        for (final Entry<String, List<String>> entry : 
DUPLICATE_KEY_MAP.entrySet()) {
+            final String key = entry.getKey();
+            final List<String> duplicateKeys = entry.getValue();
+            final String value = config.get(key);
+            if (value != null) {
+                for (final String duplicateKey : duplicateKeys) {
+                    config.set(duplicateKey, value);
+                }
+            }
+        }
+    }
+
+    /**
+     * Sets all duplicate keys for the property in the config to the specified 
value.
+     * @param config the {@link Configuration}.
+     * @param property the property to set and all its duplicates.
+     * @param value the value to set the property to.
+     */
+    public static void setDuplicateKeysForProperty(final Configuration config, 
final String property, final String value) {
+        final List<String> duplicateKeys = DUPLICATE_KEY_MAP.get(property);
+        config.set(property, value);
+        if (duplicateKeys != null) {
+            for (final String key : duplicateKeys) {
+                config.set(key, value);
+            }
+        }
+    }
+
+    /**
+     * Creates a formatted string for the start time based on the specified 
date and whether the dialog is to be displayed.
+     * @param startDate the start {@link Date} to format.
+     * @param isStartTimeDialogEnabled {@code true} to display the time dialog 
instead of using the date. {@code false}
+     * to use the provided {@code startDate}.
+     * @return the formatted start time string or {@code "dialog"}.
+     */
+    public static String getStartTimeString(final Date startDate, final 
boolean isStartTimeDialogEnabled) {
+        String startTimeString;
+        if (isStartTimeDialogEnabled) {
+            startTimeString = USE_START_TIME_DIALOG; // set start date from 
dialog box
+        } else {
+            startTimeString = convertDateToStartTimeString(startDate);
+        }
+        return startTimeString;
+    }
+
+    /**
+     * Converts the specified date into a string to use as the start time for 
the timestamp filter.
+     * @param date the start {@link Date} of the filter that will be formatted 
as a string.
+     * @return the formatted start time string.
+     */
+    public static String convertDateToStartTimeString(final Date date) {
+        final String startTimeString = START_TIME_FORMATTER.format(date);
+        return startTimeString;
+    }
+
+    /**
+     * Converts the specified string into a date to use as the start time for 
the timestamp filter.
+     * @param startTimeString the formatted time string.
+     * @return the start {@link Date}.
+     */
+    public static Date convertStartTimeStringToDate(final String 
startTimeString) {
+        Date date;
+        try {
+            date = START_TIME_FORMATTER.parse(startTimeString);
+        } catch (final ParseException e) {
+            log.error("Could not parse date", e);
+            return null;
+        }
+        return date;
+    }
+
+    /**
+     * Creates an {@link IteratorSetting} with a time stamp filter that starts 
with the specified data.
+     * @param startTimeString the start time of the filter.
+     * @return the {@link IteratorSetting}.
+     */
+    public static IteratorSetting getStartTimeSetting(final String 
startTimeString) {
+        Date date = null;
+        try {
+            date = START_TIME_FORMATTER.parse(startTimeString);
+        } catch (final ParseException e) {
+            throw new IllegalArgumentException("Couldn't parse " + 
startTimeString, e);
+        }
+        return getStartTimeSetting(date);
+    }
+
+    /**
+     * Creates an {@link IteratorSetting} with a time stamp filter that starts 
with the specified data.
+     * @param date the start {@link Date} of the filter.
+     * @return the {@link IteratorSetting}.
+     */
+    public static IteratorSetting getStartTimeSetting(final Date date) {
+        return getStartTimeSetting(date.getTime());
+    }
+
+    /**
+     * Creates an {@link IteratorSetting} with a time stamp filter that starts 
with the specified data.
+     * @param time the start time of the filter.
+     * @return the {@link IteratorSetting}.
+     */
+    public static IteratorSetting getStartTimeSetting(final long time) {
+        final IteratorSetting setting = new IteratorSetting(1, 
"startTimeIterator", TimestampFilter.class);
+        TimestampFilter.setStart(setting, time, true);
+        TimestampFilter.setEnd(setting, Long.MAX_VALUE, true);
+        return setting;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/parent/AccumuloParentMetadataRepository.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/parent/AccumuloParentMetadataRepository.java
 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/parent/AccumuloParentMetadataRepository.java
new file mode 100644
index 0000000..fbf8374
--- /dev/null
+++ 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/parent/AccumuloParentMetadataRepository.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.export.accumulo.parent;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableExistsException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.lexicoder.DateLexicoder;
+import org.apache.accumulo.core.client.lexicoder.Lexicoder;
+import org.apache.accumulo.core.client.lexicoder.LongLexicoder;
+import org.apache.accumulo.core.client.lexicoder.StringLexicoder;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.hadoop.io.Text;
+import org.apache.log4j.Logger;
+import org.apache.rya.export.api.MergerException;
+import org.apache.rya.export.api.parent.MergeParentMetadata;
+import org.apache.rya.export.api.parent.ParentMetadataDoesNotExistException;
+import org.apache.rya.export.api.parent.ParentMetadataExistsException;
+import org.apache.rya.export.api.parent.ParentMetadataRepository;
+
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.MRUtils;
+
+/**
+ * Accumulo repository for metadata pertaining to the parent database.  This
+ * will contain all information to identify where any data was exported from.
+ * <p>
+ * The data found here is:
+ * <li>Parent database Rya Instance Name</li>
+ * <li>Timestamp used as the lower cutoff for the export</li>
+ */
+public class AccumuloParentMetadataRepository implements 
ParentMetadataRepository {
+    private static final Logger log = 
Logger.getLogger(AccumuloParentMetadataRepository.class);
+
+    /**
+     * The Row ID of all {@link MergeParentMetadata} entries that are stored 
in Accumulo.
+     */
+    private static final Text MERGE_PARENT_METADATA_ROW_ID = new 
Text("MergeParentMetadata");
+
+    /**
+     * The Column Family for all MergeParentMetadata entries.
+     */
+    private static final Text MERGE_PARENT_METADATA_FAMILY = new 
Text("mergeParentMetadata");
+
+    /**
+     * The Column Qualifier for the Rya instance name.
+     */
+    private static final Text MERGE_PARENT_METADATA_RYA_INSTANCE_NAME = new 
Text("ryaInstanceName");
+
+    /**
+     * The Column Qualifier for the copy tool timestamp.
+     */
+    private static final Text MERGE_PARENT_METADATA_TIMESTAMP = new 
Text("timestamp");
+
+    /**
+     * The Column Qualifier for the copy tool filter timestamp.
+     */
+    private static final Text MERGE_PARENT_METADATA_FILTER_TIMESTAMP = new 
Text("filterTimestamp");
+
+    /**
+     * The Column Qualifier for the parent time offset.
+     */
+    private static final Text MERGE_PARENT_METADATA_PARENT_TIME_OFFSET = new 
Text("parentTimeOffset");
+
+    // Lexicoders used to read/write MergeParentMetadata to/from Accumulo.
+    private static final LongLexicoder LONG_LEXICODER = new LongLexicoder();
+    private static final StringLexicoder STRING_LEXICODER = new 
StringLexicoder();
+    private static final DateLexicoder DATE_LEXICODER = new DateLexicoder();
+
+    private static final String BASE_MERGE_METADATA_TABLE_NAME = 
"merge_metadata";
+
+    private final AccumuloRyaDAO accumuloRyaDao;
+    private final Connector connector;
+    private final String tablePrefix;
+    private final String mergeParentMetadataTableName;
+    private MergeParentMetadata metadata = null;
+
+    /**
+     * Creates a new instance of {@link AccumuloParentMetadataRepository}.
+     * @param accumuloRyaDao the {@link AccumuloRyaDAO}. (not {@code null})
+     */
+    public AccumuloParentMetadataRepository(final AccumuloRyaDAO 
accumuloRyaDao) {
+        this.accumuloRyaDao = checkNotNull(accumuloRyaDao);
+        connector = accumuloRyaDao.getConnector();
+        tablePrefix = accumuloRyaDao.getConf().getTablePrefix();
+        mergeParentMetadataTableName = tablePrefix + 
BASE_MERGE_METADATA_TABLE_NAME;
+    }
+
+    @Override
+    public MergeParentMetadata get() throws 
ParentMetadataDoesNotExistException {
+        if (metadata == null) {
+            metadata = getMetadataFromTable();
+        }
+        return metadata;
+    }
+
+    private MergeParentMetadata getMetadataFromTable() throws 
ParentMetadataDoesNotExistException {
+        try {
+            // Create an Accumulo scanner that iterates through the metadata 
entries.
+            final Scanner scanner = 
connector.createScanner(mergeParentMetadataTableName, new Authorizations());
+            final Iterator<Entry<Key, Value>> entries = scanner.iterator();
+
+            // No metadata has been stored in the table yet.
+            if (!entries.hasNext()) {
+                log.error("Could not find any MergeParentMetadata metadata in 
the table named: " + mergeParentMetadataTableName);
+            }
+
+            // Fetch the metadata from the entries.
+            String ryaInstanceName = null;
+            Date timestamp = null;
+            Date filterTimestamp = null;
+            Long parentTimeOffset = null;
+
+            while (entries.hasNext()) {
+                final Entry<Key, Value> entry = entries.next();
+                final Text columnQualifier = 
entry.getKey().getColumnQualifier();
+                final byte[] value = entry.getValue().get();
+
+                if 
(columnQualifier.equals(MERGE_PARENT_METADATA_RYA_INSTANCE_NAME)) {
+                    ryaInstanceName = STRING_LEXICODER.decode(value);
+                } else if 
(columnQualifier.equals(MERGE_PARENT_METADATA_TIMESTAMP)) {
+                    timestamp = DATE_LEXICODER.decode(value);
+                } else if 
(columnQualifier.equals(MERGE_PARENT_METADATA_FILTER_TIMESTAMP)) {
+                    filterTimestamp = DATE_LEXICODER.decode(value);
+                } else if 
(columnQualifier.equals(MERGE_PARENT_METADATA_PARENT_TIME_OFFSET)) {
+                    parentTimeOffset = LONG_LEXICODER.decode(value);
+                }
+            }
+
+            return new MergeParentMetadata(ryaInstanceName, timestamp, 
filterTimestamp, parentTimeOffset);
+        } catch (final TableNotFoundException e) {
+            throw new ParentMetadataDoesNotExistException("Could not add 
results to a MergeParentMetadata because the MergeParentMetadata table does not 
exist.", e);
+        } catch (final Exception e) {
+            throw new ParentMetadataDoesNotExistException("Error occurred 
while getting merge parent metadata.", e);
+        }
+    }
+
+    @Override
+    public void set(final MergeParentMetadata metadata) throws 
ParentMetadataExistsException {
+        try {
+            createTableIfNeeded();
+            writeMetadata(metadata);
+        } catch (final MergerException e) {
+            throw new ParentMetadataExistsException("Unable to set 
MergeParentMetadata", e);
+        }
+    }
+
+    private void createTableIfNeeded() throws MergerException {
+        try {
+            if (!doesMetadataTableExist()) {
+                log.debug("Creating table: " + mergeParentMetadataTableName);
+                
connector.tableOperations().create(mergeParentMetadataTableName);
+                log.debug("Created table: " + mergeParentMetadataTableName);
+                log.debug("Granting authorizations to table: " + 
mergeParentMetadataTableName);
+                final String username = 
accumuloRyaDao.getConf().get(MRUtils.AC_USERNAME_PROP);
+                connector.securityOperations().grantTablePermission(username, 
mergeParentMetadataTableName, TablePermission.WRITE);
+                log.debug("Granted authorizations to table: " + 
mergeParentMetadataTableName);
+            }
+        } catch (final TableExistsException | AccumuloException | 
AccumuloSecurityException e) {
+            throw new MergerException("Could not create a new 
MergeParentMetadata table named: " + mergeParentMetadataTableName, e);
+        }
+    }
+
+    private boolean doesMetadataTableExist() {
+        return 
connector.tableOperations().exists(mergeParentMetadataTableName);
+    }
+
+    /**
+     * Create the {@link Mutation}s required to write a
+     * {@link MergerParentMetadata} object to an Accumulo table.
+     * @param metadata - The {@link MergeParentMetadata} to write.
+     * (not {@code null})
+     * @return An ordered list of mutations that write the metadata to an
+     * Accumulo table.
+     */
+    private static List<Mutation> makeWriteMetadataMutations(final 
MergeParentMetadata metadata) {
+        checkNotNull(metadata);
+
+        final List<Mutation> mutations = new LinkedList<>();
+
+        // Rya Instance Name
+        final Mutation ryaInstanceNameMutation = 
makeFieldMutation(metadata.getRyaInstanceName(), STRING_LEXICODER, 
MERGE_PARENT_METADATA_RYA_INSTANCE_NAME);
+        mutations.add(ryaInstanceNameMutation);
+
+        // Timestamp
+        final Mutation timestampMutation= 
makeFieldMutation(metadata.getTimestamp(), DATE_LEXICODER, 
MERGE_PARENT_METADATA_TIMESTAMP);
+        mutations.add(timestampMutation);
+
+        // Filter Timestamp
+        if (metadata.getFilterTimestamp() != null) {
+            final Mutation filterTimestampMutation = 
makeFieldMutation(metadata.getFilterTimestamp(), DATE_LEXICODER, 
MERGE_PARENT_METADATA_FILTER_TIMESTAMP);
+            mutations.add(filterTimestampMutation);
+        }
+
+        // Parent Time Offset
+        if (metadata.getParentTimeOffset() != null) {
+            final Mutation parentTimeOffsetMutation = 
makeFieldMutation(metadata.getParentTimeOffset(), LONG_LEXICODER, 
MERGE_PARENT_METADATA_PARENT_TIME_OFFSET);
+            mutations.add(parentTimeOffsetMutation);
+        }
+
+        return mutations;
+    }
+
+    private static <T> Mutation makeFieldMutation(final T object, final 
Lexicoder<T> lexicoder, final Text columnQualifer) {
+        final Mutation mutation = new Mutation(MERGE_PARENT_METADATA_ROW_ID);
+        final Value value = new Value(lexicoder.encode(object));
+        mutation.put(MERGE_PARENT_METADATA_FAMILY, columnQualifer, value);
+        return mutation;
+    }
+
+    private void writeMetadata(final MergeParentMetadata metadata) throws 
MergerException {
+        BatchWriter writer = null;
+        try{
+            // Write each result.
+            final List<Mutation> mutations = 
makeWriteMetadataMutations(metadata);
+
+            writer = connector.createBatchWriter(mergeParentMetadataTableName, 
new BatchWriterConfig());
+            writer.addMutations(mutations);
+        } catch (final AccumuloException | TableNotFoundException e) {
+            throw new MergerException("Unable to set MergeParentMetadata in 
Accumulo", e);
+        } finally {
+            if (writer != null) {
+                try {
+                    writer.close();
+                } catch (final MutationsRejectedException e) {
+                    throw new MergerException("Could not add results to a 
MergeParentMetadata table because some of the mutations were rejected.", e);
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloInstanceDriver.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloInstanceDriver.java
 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloInstanceDriver.java
new file mode 100644
index 0000000..4c086d8
--- /dev/null
+++ 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloInstanceDriver.java
@@ -0,0 +1,603 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.export.accumulo.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.minicluster.MiniAccumuloCluster;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.SystemUtils;
+import org.apache.log4j.Logger;
+import org.apache.rya.export.accumulo.common.InstanceType;
+import org.apache.rya.export.accumulo.conf.AccumuloExportConstants;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.persist.RyaDAOException;
+
+/**
+ * Handles running a single {@link MiniAccumuloCluster} or a single {@link 
MockInstance} for an instance.
+ */
+public class AccumuloInstanceDriver {
+    private static final Logger log = 
Logger.getLogger(AccumuloInstanceDriver.class);
+
+    private static final boolean IS_COPY_HADOOP_HOME_ENABLED = true;
+
+    public static final String ROOT_USER_NAME = "root";
+
+    private final String driverName;
+    private final InstanceType instanceType;
+    private final boolean isMock;
+    private final boolean shouldCreateIndices;
+    private final boolean isReadOnly;
+    private final boolean isParent;
+
+    private final String user;
+    private final String password;
+    private final String instanceName;
+    private final String tablePrefix;
+    private final String auth;
+
+    private Connector connector;
+
+    private AccumuloRyaDAO dao;
+
+    private SecurityOperations secOps;
+
+    private final AccumuloRdfConfiguration config = new 
AccumuloRdfConfiguration();
+
+    private MiniAccumuloCluster miniAccumuloCluster = null;
+
+    private MockInstance mockInstance = null;
+
+    private ZooKeeperInstance zooKeeperInstance = null;
+
+    private Instance instance = null;
+
+    private String zooKeepers;
+
+    private final Map<String, String> configMap = new LinkedHashMap<>();
+
+    private List<String> indices = null;
+
+    private final List<String> tableList = new ArrayList<>();
+
+    private File tempDir = null;
+
+    public static final List<String> TABLE_NAME_SUFFIXES =
+        ImmutableList.<String>of(
+            RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_PO_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_NS_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_STATS_SUFFIX,
+            RdfCloudTripleStoreConstants.TBL_SEL_SUFFIX
+        );
+
+    /**
+     * Creates a new instance of {@link AccumuloInstanceDriver}.
+     * @param driverName the name used to identify this driver in the logs. 
(not {@code null})
+     * @param instanceType the {@link InstanceType} of this driver.
+     * @param shouldCreateIndices {@code true} to create all the indices 
associated with a Rya deployment.
+     * {@code false} otherwise.
+     * @param isReadOnly {@code true} if all the tables in the instance should 
have their
+     * table permissions set to read only.  {@code false} if the table 
permission are set to write.
+     * @param isParent {@code true} if the instance is the parent/main 
instance. {@code false} if it's the
+     * child.
+     * @param user the user name tied to this instance.
+     * @param password the password for the user.
+     * @param instanceName the name of the instance.
+     * @param tablePrefix the table prefix.
+     * @param auth the comma-separated authorization list.
+     * @param zooKeepers the comma-separated list of zoo keeper host names.
+     */
+    public AccumuloInstanceDriver(final String driverName, final InstanceType 
instanceType, final boolean shouldCreateIndices, final boolean isReadOnly, 
final boolean isParent, final String user, final String password, final String 
instanceName, final String tablePrefix, final String auth, final String 
zooKeepers) {
+        this.driverName = Preconditions.checkNotNull(driverName);
+        this.instanceType = instanceType;
+        this.isMock = instanceType.isMock();
+        this.shouldCreateIndices = shouldCreateIndices;
+        this.isReadOnly = isReadOnly;
+        this.user = user;
+        this.password = password;
+        this.instanceName = instanceName;
+        this.tablePrefix = tablePrefix;
+        this.auth = auth;
+        this.zooKeepers = zooKeepers;
+        this.isParent = isParent;
+
+        config.setTablePrefix(tablePrefix);
+    }
+
+    /**
+     * Sets up the {@link AccumuloInstanceDriver}.
+     * @throws Exception
+     */
+    public void setUp() throws Exception {
+        setUpInstance();
+        setUpTables();
+        setUpDao();
+        setUpConfig();
+    }
+
+    /**
+     * Sets up the {@link MiniAccumuloCluster} or the {@link MockInstance} or
+     * distribution instance
+     * @throws Exception
+     */
+    public void setUpInstance() throws Exception {
+        switch (instanceType) {
+            case DISTRIBUTION:
+                log.info("Setting up " + driverName + " distribution 
instance...");
+                if (instanceName == null) {
+                    throw new IllegalArgumentException("Must specify instance 
name for distributed mode");
+                } else if (zooKeepers == null) {
+                    throw new IllegalArgumentException("Must specify ZooKeeper 
hosts for distributed mode");
+                }
+                instance = new ZooKeeperInstance(instanceName, zooKeepers);
+                connector = instance.getConnector(user, new 
PasswordToken(password));
+                log.info("Created connector to " + driverName + " distribution 
instance");
+                break;
+            case MINI:
+                log.info("Setting up " + driverName + " MiniAccumulo 
cluster...");
+                // Create and Run MiniAccumulo Cluster
+                tempDir = Files.createTempDir();
+                tempDir.deleteOnExit();
+                miniAccumuloCluster = new MiniAccumuloCluster(tempDir, 
password);
+                copyHadoopHomeToTemp();
+                miniAccumuloCluster.getConfig().setInstanceName(instanceName);
+                log.info(driverName + " MiniAccumulo instance starting up...");
+                miniAccumuloCluster.start();
+                Thread.sleep(1000);
+                log.info(driverName + " MiniAccumulo instance started");
+                log.info("Creating connector to " + driverName + " 
MiniAccumulo instance...");
+                zooKeeperInstance = new 
ZooKeeperInstance(miniAccumuloCluster.getClientConfig());
+                instance = zooKeeperInstance;
+                connector = zooKeeperInstance.getConnector(user, new 
PasswordToken(password));
+                log.info("Created connector to " + driverName + " MiniAccumulo 
instance");
+                break;
+            case MOCK:
+                log.info("Setting up " + driverName + " mock instance...");
+                mockInstance = new MockInstance(instanceName);
+                instance = mockInstance;
+                connector = mockInstance.getConnector(user, new 
PasswordToken(password));
+                log.info("Created connector to " + driverName + " mock 
instance");
+                break;
+            default:
+                throw new AccumuloException("Unexpected instance type: " + 
instanceType);
+        }
+        zooKeepers = instance.getZooKeepers();
+    }
+
+    /**
+     * Copies the HADOOP_HOME bin directory to the {@link MiniAccumuloCluster} 
temp directory.
+     * {@link MiniAccumuloCluster} expects to find bin/winutils.exe in the MAC 
temp
+     * directory instead of HADOOP_HOME for some reason.
+     * @throws IOException
+     */
+    private void copyHadoopHomeToTemp() throws IOException {
+        if (IS_COPY_HADOOP_HOME_ENABLED && SystemUtils.IS_OS_WINDOWS) {
+            final String hadoopHomeEnv = System.getenv("HADOOP_HOME");
+            if (hadoopHomeEnv != null) {
+                final File hadoopHomeDir = new File(hadoopHomeEnv);
+                if (hadoopHomeDir.exists()) {
+                    final File binDir = 
Paths.get(hadoopHomeDir.getAbsolutePath(), "/bin").toFile();
+                    if (binDir.exists()) {
+                        FileUtils.copyDirectoryToDirectory(binDir, tempDir);
+                    } else {
+                        log.warn("The specified path for the Hadoop bin 
directory does not exist: " + binDir.getAbsolutePath());
+                    }
+                } else {
+                    log.warn("The specified path for HADOOP_HOME does not 
exist: " + hadoopHomeDir.getAbsolutePath());
+                }
+            } else {
+                log.warn("The HADOOP_HOME environment variable was not 
found.");
+            }
+        }
+    }
+
+    /**
+     * Sets up all the tables and indices.
+     * @throws Exception
+     */
+    public void setUpTables() throws Exception {
+        // Setup tables and permissions
+        log.info("Setting up " + driverName + " tables and permissions");
+        for (final String tableSuffix : TABLE_NAME_SUFFIXES) {
+            final String tableName = tablePrefix + tableSuffix;
+            tableList.add(tableName);
+            if (!connector.tableOperations().exists(tableName)) {
+                connector.tableOperations().create(tableName);
+            }
+        }
+
+        if (shouldCreateIndices) {
+            indices = Arrays.asList(
+                /* TODO: SEE RYA-160
+                ConfigUtils.getFreeTextDocTablename(config),
+                ConfigUtils.getFreeTextTermTablename(config),
+                ConfigUtils.getGeoTablename(config),
+                ConfigUtils.getTemporalTableName(config),
+                ConfigUtils.getEntityTableName(config)
+                */
+            );
+
+            tableList.addAll(indices);
+
+            log.info("Setting up " + driverName + " indices");
+            for (final String index : indices) {
+                if (!connector.tableOperations().exists(index)) {
+                    connector.tableOperations().create(index);
+                }
+            }
+        }
+
+        // Setup user with authorizations
+        log.info("Creating " + driverName + " user and authorizations");
+        secOps = connector.securityOperations();
+        if (!user.equals(ROOT_USER_NAME)) {
+            secOps.createLocalUser(user, new PasswordToken(password));
+        }
+        addAuths(auth);
+        final TablePermission tablePermission = isReadOnly ? 
TablePermission.READ : TablePermission.WRITE;
+        for (final String tableSuffix : TABLE_NAME_SUFFIXES) {
+            secOps.grantTablePermission(user, tablePrefix + tableSuffix, 
tablePermission);
+        }
+        if (shouldCreateIndices) {
+            for (final String index : indices) {
+                secOps.grantTablePermission(user, index, tablePermission);
+            }
+        }
+    }
+
+    /**
+     * Sets up the {@link AccumuloRyaDAO}.
+     * @throws Exception
+     */
+    public void setUpDao() throws Exception {
+        // Setup dao
+        log.info("Creating " + driverName + " DAO");
+        dao = new AccumuloRyaDAO();
+        dao.setConnector(connector);
+        dao.setConf(config);
+
+        // Flush the tables before initializing the DAO
+        for (final String tableName : tableList) {
+            connector.tableOperations().flush(tableName, null, null, false);
+        }
+
+        dao.init();
+    }
+
+    /**
+     * Sets up the configuration and prints the arguments.
+     */
+    public void setUpConfig() {
+        log.info("Setting " + driverName + " config");
+
+        // Setup config
+        if (isMock) {
+            configMap.put(MRUtils.AC_MOCK_PROP, Boolean.TRUE.toString());
+        }
+        configMap.put(MRUtils.AC_INSTANCE_PROP, instanceName);
+        configMap.put(MRUtils.AC_USERNAME_PROP, user);
+        configMap.put(MRUtils.AC_PWD_PROP, password);
+        configMap.put(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix);
+        configMap.put(MRUtils.AC_AUTH_PROP, auth);
+        configMap.put(MRUtils.AC_ZK_PROP, zooKeepers != null ? zooKeepers : 
"localhost");
+        configMap.put(AccumuloExportConstants.ACCUMULO_INSTANCE_TYPE_PROP, 
instanceType.toString());
+
+        log.info(driverName + " config properties");
+        config.setTablePrefix(tablePrefix);
+        for (final Entry<String, String> entry : configMap.entrySet()) {
+            final String key = entry.getKey();
+            final String value = entry.getValue();
+            final String argument = "-D" + key + "=" + value;
+            log.info(argument);
+            config.set(key, value);
+        }
+
+        AccumuloExportConstants.setDuplicateKeys(config);
+    }
+
+    /**
+     * Tears down all the tables and indices.
+     * @throws Exception
+     */
+    public void tearDownTables() throws Exception {
+        // delete all tables.
+        if (connector != null) {
+            for (final String tableName : tableList) {
+                if (connector.tableOperations().exists(tableName)) {
+                    connector.tableOperations().delete(tableName);
+                }
+            }
+        }
+    }
+
+    /**
+     * Tears down the {@link AccumuloRyaDAO}.
+     * @throws Exception
+     */
+    public void tearDownDao() throws Exception {
+        if (dao != null) {
+            log.info("Stopping " + driverName + " DAO");
+            try {
+                dao.destroy();
+            } catch (final RyaDAOException e) {
+                log.error("Error stopping " + driverName + " DAO", e);
+            }
+            dao = null;
+        }
+    }
+
+    /**
+     * Tears down the instance.
+     * @throws Exception
+     */
+    public void tearDownInstance() throws Exception {
+        if (miniAccumuloCluster != null) {
+            log.info("Stopping " + driverName + " cluster");
+            try {
+                miniAccumuloCluster.stop();
+            } catch (IOException | InterruptedException e) {
+                log.error("Error stopping " + driverName + " cluster", e);
+            }
+            miniAccumuloCluster = null;
+        }
+    }
+
+    /**
+     * Tears down the {@link AccumuloInstanceDriver}.
+     * @throws Exception
+     */
+    public void tearDown() throws Exception {
+        try {
+            //tearDownTables();
+            tearDownDao();
+            tearDownInstance();
+        } finally {
+            removeTempDir();
+        }
+    }
+
+    /**
+     * Deletes the {@link MiniAccumuloCluster} temporary directory.
+     */
+    public void removeTempDir() {
+        if (tempDir != null) {
+            try {
+                FileUtils.deleteDirectory(tempDir);
+            } catch (final IOException e) {
+                log.error("Error deleting " + driverName + " temp directory", 
e);
+            }
+            tempDir = null;
+        }
+    }
+
+    /**
+     * Adds authorizations to the {@link SecurityOperations} of this 
instance's user.
+     * @param auths the list of authorizations to add.
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     */
+    public void addAuths(final String... auths) throws AccumuloException, 
AccumuloSecurityException {
+        final Authorizations newAuths = AccumuloRyaUtils.addUserAuths(user, 
secOps, auths);
+        secOps.changeUserAuthorizations(user, newAuths);
+    }
+
+    /**
+     * @return the {@link Authorizations} of this instance's user.
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     */
+    public Authorizations getAuths() throws AccumuloException, 
AccumuloSecurityException {
+        if (secOps != null) {
+            return secOps.getUserAuthorizations(user);
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * @return the {@link InstanceType}.
+     */
+    public InstanceType getInstanceType() {
+        return instanceType;
+    }
+
+    /**
+     * @return {@code true} if this is a mock instance.  {@code false} if this 
is a MiniAccumuloCluster instance.
+     */
+    public boolean isMock() {
+        return isMock;
+    }
+
+    /**
+     * @return {@code true} to create all the indices associated with a Rya 
deployment.
+     * {@code false} otherwise.
+     */
+    public boolean shouldCreateIndices() {
+        return shouldCreateIndices;
+    }
+
+    /**
+     * @return {@code true} if all the tables in the instance should have their
+     * table permissions set to read only.  {@code false} if the table 
permission are set to write.
+     */
+    public boolean isReadOnly() {
+        return isReadOnly;
+    }
+
+    /**
+     * @return the user name tied to this instance
+     */
+    public String getUser() {
+        return user;
+    }
+
+    /**
+     * @return the password for the user.
+     */
+    public String getPassword() {
+        return password;
+    }
+
+    /**
+     * @return the name of the instance.
+     */
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    /**
+     * @return the table prefix.
+     */
+    public String getTablePrefix() {
+        return tablePrefix;
+    }
+
+    /**
+     * @return the comma-separated authorization list.
+     */
+    public String getAuth() {
+        return auth;
+    }
+
+    /**
+     * @return the {@link Connector} to this instance.
+     */
+    public Connector getConnector() {
+        return connector;
+    }
+
+    /**
+     * Sets the {@link Connector} to this instance.
+     * @param connector the {@link Connector}.
+     */
+    public void setConnector(final Connector connector) {
+        this.connector = connector;
+    }
+
+    /**
+     * @return the {@link AccumuloRyaDAO}.
+     */
+    public AccumuloRyaDAO getDao() {
+        return dao;
+    }
+
+    /**
+     * @return the {@link SecurityOperations}.
+     */
+    public SecurityOperations getSecOps() {
+        return secOps;
+    }
+
+    /**
+     * @return the {@link AccumuloRdfConfiguration}.
+     */
+    public AccumuloRdfConfiguration getConfig() {
+        return config;
+    }
+
+    /**
+     * @return the {@link MiniAccumuloCluster} for this instance or {@code 
null} if
+     * this is a {@link MockInstance}.
+     */
+    public MiniAccumuloCluster getMiniAccumuloCluster() {
+        return miniAccumuloCluster;
+    }
+
+    /**
+     * @return the {@link MockInstance} for this instance or {@code null} if
+     * this is a {@link MiniAccumuloCluster}.
+     */
+    public MockInstance getMockInstance() {
+        return mockInstance;
+    }
+
+    /**
+     * @return the {@link ZooKeeperInstance} for this instance or {@code null} 
if
+     * this is a {@link MockInstance}.
+     */
+    public ZooKeeperInstance getZooKeeperInstance() {
+        return zooKeeperInstance;
+    }
+
+    /**
+     * @return the {@link ZooKeepInstance} or {@link MockInstance}.
+     */
+    public Instance getInstance() {
+        return instance;
+    }
+
+    /**
+     * @return the comma-separated list of zoo keeper host names.
+     */
+    public String getZooKeepers() {
+        return zooKeepers;
+    }
+
+    /**
+     * @return an unmodifiable map of the configuration keys and values.
+     */
+    public Map<String, String> getConfigMap() {
+        return Collections.unmodifiableMap(configMap);
+    }
+
+    /**
+     * @return an unmodifiable list of the table names and indices.
+     */
+    public List<String> getTableList() {
+        return Collections.unmodifiableList(tableList);
+    }
+
+    /**
+     * @return the {@link MiniAccumuloCluster} temporary directory for this 
instance or {@code null}
+     * if it's a {@link MockInstance}.
+     */
+    public File getTempDir() {
+        return tempDir;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/f2b046f3/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java
----------------------------------------------------------------------
diff --git 
a/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java
 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java
new file mode 100644
index 0000000..c425227
--- /dev/null
+++ 
b/extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/util/AccumuloRyaUtils.java
@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.export.accumulo.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.accumulo.core.client.ClientConfiguration;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.ZooKeeperInstance;
+import org.apache.accumulo.core.client.admin.SecurityOperations;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.iterators.user.RegExFilter;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+import org.openrdf.model.ValueFactory;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableSet;
+
+import mvm.rya.accumulo.AccumuloRdfConfiguration;
+import mvm.rya.accumulo.AccumuloRyaDAO;
+import mvm.rya.accumulo.mr.MRUtils;
+import mvm.rya.api.RdfCloudTripleStoreConstants;
+import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
+import mvm.rya.api.domain.RyaStatement;
+import mvm.rya.api.domain.RyaURI;
+import mvm.rya.api.persist.RyaDAOException;
+import mvm.rya.api.resolver.RdfToRyaConversions;
+import mvm.rya.api.resolver.RyaTripleContext;
+import mvm.rya.api.resolver.triple.TripleRow;
+import mvm.rya.api.resolver.triple.TripleRowResolverException;
+import mvm.rya.indexing.accumulo.ConfigUtils;
+
+/**
+ * Utility methods for an Accumulo Rya instance.
+ */
+public final class AccumuloRyaUtils {
+    private static final Logger log = Logger.getLogger(AccumuloRyaUtils.class);
+
+    private static final String NAMESPACE = 
RdfCloudTripleStoreConstants.NAMESPACE;
+    private static final ValueFactory VALUE_FACTORY = 
RdfCloudTripleStoreConstants.VALUE_FACTORY;
+
+    /**
+     * Ignore the meta statements indicating the Rya version and copy time 
values.
+     */
+    public static final ImmutableSet<IteratorSetting> 
COMMON_REG_EX_FILTER_SETTINGS = ImmutableSet.of(
+        getVersionRegExFilterSetting()
+    );
+
+    /**
+     * Private constructor to prevent instantiation.
+     */
+    private AccumuloRyaUtils() {
+    }
+
+    /**
+     * Creates a {@link RyaURI} for the specified local name.
+     * @param localName the URI's local name.
+     * @return the {@link RyaURI}.
+     */
+    public static RyaURI createRyaUri(final String localName) {
+        return createRyaUri(NAMESPACE, localName);
+    }
+
+    /**
+     * Creates a {@link RyaURI} for the specified local name.
+     * @param namespace the namespace.
+     * @param localName the URI's local name.
+     * @return the {@link RyaURI}.
+     */
+    public static RyaURI createRyaUri(final String namespace, final String 
localName) {
+        return 
RdfToRyaConversions.convertURI(VALUE_FACTORY.createURI(namespace, localName));
+    }
+
+    /**
+     * Converts a {@link RyaURI} to the contained data string.
+     * @param the {@link RyaURI} to convert.
+     * @return the data value without the namespace.
+     */
+    public static String convertRyaUriToString(final RyaURI ryaUri) {
+        return convertRyaUriToString(NAMESPACE, ryaUri);
+    }
+
+    /**
+     * Converts a {@link RyaURI} to the contained data string.
+     * @param namespace the namespace.
+     * @param the {@link RyaURI} to convert.
+     * @return the data value without the namespace.
+     */
+    public static String convertRyaUriToString(final String namespace, final 
RyaURI ryaUri) {
+        return StringUtils.replaceOnce(ryaUri.getData(), namespace, "");
+    }
+
+    /**
+     * Creates a {@link RyaStatement} from a {@link Key}/{@link Value} pair.
+     * @param key the {@link Key}.
+     * @param value the {@link Value}.
+     * @param ryaTripleContext the {@link RyaTripleContext}.
+     * @return the converted {@link RyaStatement}.
+     * @throws TripleRowResolverException
+     */
+    public static RyaStatement createRyaStatement(final Key key, final Value 
value, final RyaTripleContext ryaTripleContext) throws 
TripleRowResolverException {
+        final byte[] row = key.getRowData() != null  && 
key.getRowData().toArray().length > 0 ? key.getRowData().toArray() : null;
+        final byte[] columnFamily = key.getColumnFamilyData() != null  && 
key.getColumnFamilyData().toArray().length > 0 ? 
key.getColumnFamilyData().toArray() : null;
+        final byte[] columnQualifier = key.getColumnQualifierData() != null  
&& key.getColumnQualifierData().toArray().length > 0 ? 
key.getColumnQualifierData().toArray() : null;
+        final Long timestamp = key.getTimestamp();
+        final byte[] columnVisibility = key.getColumnVisibilityData() != null 
&& key.getColumnVisibilityData().toArray().length > 0 ? 
key.getColumnVisibilityData().toArray() : null;
+        final byte[] valueBytes = value != null && value.get().length > 0 ? 
value.get() : null;
+        final TripleRow tripleRow = new TripleRow(row, columnFamily, 
columnQualifier, timestamp, columnVisibility, valueBytes);
+        final RyaStatement ryaStatement = 
ryaTripleContext.deserializeTriple(TABLE_LAYOUT.SPO, tripleRow);
+
+        return ryaStatement;
+    }
+
+    /**
+     * Creates a {@link RegExFilter} setting to ignore the version row in a 
table.
+     * @return the {@link RegExFilter} {@link IteratorSetting}.
+     */
+    public static IteratorSetting getVersionRegExFilterSetting() {
+        final IteratorSetting regex = new IteratorSetting(30, "version_regex", 
RegExFilter.class);
+        RegExFilter.setRegexs(regex, 
"(.*)urn:(.*)#version[\u0000|\u0001](.*)", null, null, null, false);
+        RegExFilter.setNegate(regex, true);
+        return regex;
+    }
+
+    /**
+     * Adds all the common regex filter {@link IteratorSetting}s to the 
provided {@link Scanner} so
+     * certain metadata keys in a table are ignored.
+     * @param scanner the {@link Scanner} to add the regex filter {@link 
IteratorSetting}s to.
+     */
+    public static void addCommonScannerIteratorsTo(final Scanner scanner) {
+        for (final IteratorSetting iteratorSetting : 
COMMON_REG_EX_FILTER_SETTINGS) {
+            scanner.addScanIterator(iteratorSetting);
+        }
+    }
+
+    /**
+     * Creates a {@link Scanner} of the provided table name using the 
specified {@link Configuration}.
+     * This applies common iterator settings to the table scanner that ignore 
internal metadata keys.
+     * @param tablename the name of the table to scan.
+     * @param config the {@link Configuration}.
+     * @return the {@link Scanner} for the table.
+     * @throws IOException
+     */
+    public static Scanner getScanner(final String tableName, final 
Configuration config) throws IOException {
+        return getScanner(tableName, config, true);
+    }
+
+    /**
+     * Creates a {@link Scanner} of the provided table name using the 
specified {@link Configuration}.
+     * @param tablename the name of the table to scan.
+     * @param config the {@link Configuration}.
+     * @param shouldAddCommonIterators {@code true} to add the common 
iterators to the table scanner.
+     * {@code false} otherwise.
+     * @return the {@link Scanner} for the table.
+     * @throws IOException
+     */
+    public static Scanner getScanner(final String tableName, final 
Configuration config, final boolean shouldAddCommonIterators) throws 
IOException {
+        try {
+            final String instanceName = 
config.get(ConfigUtils.CLOUDBASE_INSTANCE);
+            final String zooKeepers = 
config.get(ConfigUtils.CLOUDBASE_ZOOKEEPERS);
+            Instance instance;
+            if (ConfigUtils.useMockInstance(config)) {
+                instance = new MockInstance(instanceName);
+            } else {
+                instance = new ZooKeeperInstance(new 
ClientConfiguration().withInstance(instanceName).withZkHosts(zooKeepers));
+            }
+            final String username = ConfigUtils.getUsername(config);
+            final String password = ConfigUtils.getPassword(config);
+            final Connector connector = instance.getConnector(username, new 
PasswordToken(password));
+            final Authorizations auths = ConfigUtils.getAuthorizations(config);
+
+            final Scanner scanner = connector.createScanner(tableName, auths);
+            if (shouldAddCommonIterators) {
+                AccumuloRyaUtils.addCommonScannerIteratorsTo(scanner);
+            }
+            return scanner;
+        } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException e) {
+            log.error("Error connecting to " + tableName);
+            throw new IOException(e);
+        }
+    }
+
+    /**
+     * Prints the table with the specified config and additional settings.
+     * This applies common iterator settings to the table scanner that ignore 
internal metadata keys.
+     * @param tableName the name of the table to print.
+     * @param config the {@link AccumuloRdfConfiguration}.
+     * @param settings the additional {@link IteratorSetting}s to add besides 
the common ones.
+     * @throws IOException
+     */
+    public static void printTable(final String tableName, final 
AccumuloRdfConfiguration config, final IteratorSetting... settings) throws 
IOException {
+        printTable(tableName, config, true, settings);
+    }
+
+    /**
+     * Prints the table with the specified config and additional settings.
+     * @param tableName the name of the table to print.
+     * @param config the {@link AccumuloRdfConfiguration}.
+     * @param shouldAddCommonIterators {@code true} to add the common 
iterators to the table scanner.
+     * {@code false} otherwise.
+     * @param settings the additional {@link IteratorSetting}s to add besides 
the common ones.
+     * @throws IOException
+     */
+    public static void printTable(final String tableName, final 
AccumuloRdfConfiguration config, final boolean shouldAddCommonIterators, final 
IteratorSetting... settings) throws IOException {
+        final Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config, 
shouldAddCommonIterators);
+        for (final IteratorSetting setting : settings) {
+            scanner.addScanIterator(setting);
+        }
+
+        final Iterator<Entry<Key, Value>> iterator = scanner.iterator();
+
+        final String instance = config.get(MRUtils.AC_INSTANCE_PROP);
+        log.info("==================");
+        log.info("TABLE: " + tableName + " INSTANCE: " + instance);
+        log.info("------------------");
+        while (iterator.hasNext()) {
+            final Entry<Key, Value> entry = iterator.next();
+            final Key key = entry.getKey();
+            final Value value = entry.getValue();
+            final String keyString = getFormattedKeyString(key);
+            log.info(keyString + " - " + value);
+        }
+        log.info("==================");
+    }
+
+    private static String getFormattedKeyString(final Key key) {
+        final StringBuilder sb = new StringBuilder();
+        final byte[] row = key.getRow().getBytes();
+        final byte[] colFamily = key.getColumnFamily().getBytes();
+        final byte[] colQualifier = key.getColumnQualifier().getBytes();
+        final byte[] colVisibility = key.getColumnVisibility().getBytes();
+        final int maxRowDataToPrint = 256;
+        Key.appendPrintableString(row, 0, row.length, maxRowDataToPrint, sb);
+        sb.append(" ");
+        Key.appendPrintableString(colFamily, 0, colFamily.length, 
maxRowDataToPrint, sb);
+        sb.append(":");
+        Key.appendPrintableString(colQualifier, 0, colQualifier.length, 
maxRowDataToPrint, sb);
+        sb.append(" [");
+        Key.appendPrintableString(colVisibility, 0, colVisibility.length, 
maxRowDataToPrint, sb);
+        sb.append("]");
+        sb.append(" ");
+        sb.append(new Date(key.getTimestamp()));
+        //sb.append(Long.toString(key.getTimestamp()));
+        //sb.append(" ");
+        //sb.append(key.isDeleted());
+        return sb.toString();
+    }
+
+    /**
+     * Prints the table with pretty formatting using the specified config and 
additional settings.
+     * This applies common iterator settings to the table scanner that ignore 
internal metadata keys.
+     * @param tableName the name of the table to print.
+     * @param config the {@link AccumuloRdfConfiguration}.
+     * @param settings the additional {@link IteratorSetting}s to add besides 
the common ones.
+     * @throws IOException
+     */
+    public static void printTablePretty(final String tableName, final 
Configuration config, final IteratorSetting... settings) throws 
AccumuloException, AccumuloSecurityException, TableNotFoundException, 
IOException {
+        printTablePretty(tableName, config, true, settings);
+    }
+
+    /**
+     * Prints the table with pretty formatting using the specified config and 
additional settings.
+     * @param tableName the name of the table to print.
+     * @param config the {@link AccumuloRdfConfiguration}.
+     * @param shouldAddCommonIterators {@code true} to add the common 
iterators to the table scanner.
+     * {@code false} otherwise.
+     * @param settings the additional {@link IteratorSetting}s to add besides 
the common ones.
+     * @throws IOException
+     */
+    public static void printTablePretty(final String tableName, final 
Configuration config, final boolean shouldAddCommonIterators, final 
IteratorSetting... settings) throws AccumuloException, 
AccumuloSecurityException, TableNotFoundException, IOException {
+        final Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config, 
shouldAddCommonIterators);
+        for (final IteratorSetting setting : settings) {
+            scanner.addScanIterator(setting);
+        }
+
+        final String format = "| %-64s | %-24s | %-28s | %-20s | %-20s | %-10s 
|";
+        final int totalFormatLength = String.format(format, 1, 2, 3, 4, 5, 
6).length();
+        final String instance = config.get(MRUtils.AC_INSTANCE_PROP);
+        log.info(StringUtils.rightPad("==================", totalFormatLength, 
"="));
+        log.info(StringUtils.rightPad("| TABLE: " + tableName + " INSTANCE: " 
+ instance, totalFormatLength - 1) + "|");
+        log.info(StringUtils.rightPad("------------------", totalFormatLength, 
"-"));
+        log.info(String.format(format, "--Row--", "--ColumnVisibility--", 
"--Timestamp--", "--ColumnFamily--", "--ColumnQualifier--", "--Value--"));
+        log.info(StringUtils.rightPad("|-----------------", totalFormatLength 
- 1, "-") + "|");
+        for (final Entry<Key, Value> entry : scanner) {
+            final Key k = entry.getKey();
+            final String rowString = 
Key.appendPrintableString(k.getRow().getBytes(), 0, k.getRow().getLength(), 
Constants.MAX_DATA_TO_PRINT, new StringBuilder()).toString();
+            log.info(String.format(format, rowString, k.getColumnVisibility(), 
new Date(k.getTimestamp()), k.getColumnFamily(), k.getColumnQualifier(), 
entry.getValue()));
+        }
+        log.info(StringUtils.rightPad("==================", totalFormatLength, 
"="));
+    }
+
+    /**
+     * Adds authorizations to a user's authorizations list.
+     * @param user the name of the user to add authorizations for.
+     * @param secOps the {@link SecurityOperations}.
+     * @param auths the {@link Authorizations} to add
+     * @return the {@link Authorizations}.
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     */
+    public static Authorizations addUserAuths(final String user, final 
SecurityOperations secOps, final Authorizations auths) throws 
AccumuloException, AccumuloSecurityException {
+        final List<String> authList = new ArrayList<>();
+        for (final byte[] authBytes : auths.getAuthorizations()) {
+            final String auth = new String(authBytes);
+            authList.add(auth);
+        }
+        return addUserAuths(user, secOps, authList.toArray(new String[0]));
+    }
+
+    /**
+     * Adds authorizations to a user's authorizations list.
+     * @param user the name of the user to add authorizations for.
+     * @param secOps the {@link SecurityOperations}.
+     * @param auths the list of authorizations to add
+     * @return the {@link Authorizations}.
+     * @throws AccumuloException
+     * @throws AccumuloSecurityException
+     */
+    public static Authorizations addUserAuths(final String user, final 
SecurityOperations secOps, final String... auths) throws AccumuloException, 
AccumuloSecurityException {
+        final Authorizations currentUserAuths = 
secOps.getUserAuthorizations(user);
+        final List<byte[]> authList = new ArrayList<>();
+        for (final byte[] currentAuth : currentUserAuths.getAuthorizations()) {
+            authList.add(currentAuth);
+        }
+        for (final String newAuth : auths) {
+            authList.add(newAuth.getBytes());
+        }
+        final Authorizations result = new Authorizations(authList);
+        return result;
+    }
+
+    /**
+     * Removes the specified authorizations from the user.
+     * @param userName the name of the user to change authorizations for.
+     * @param secOps the {@link SecurityOperations} to change.
+     * @param authsToRemove the comma-separated string of authorizations to 
remove.
+     * @throws AccumuloSecurityException
+     * @throws AccumuloException
+     */
+    public static void removeUserAuths(final String userName, final 
SecurityOperations secOps, final String authsToRemove) throws 
AccumuloException, AccumuloSecurityException {
+        final Authorizations currentUserAuths = 
secOps.getUserAuthorizations(userName);
+        final List<String> authList = 
convertAuthStringToList(currentUserAuths.toString());
+
+        final List<String> authsToRemoveList = 
convertAuthStringToList(authsToRemove);
+        authList.removeAll(authsToRemoveList);
+
+        final String authString = Joiner.on(",").join(authList);
+        final Authorizations newAuths = new Authorizations(authString);
+
+        secOps.changeUserAuthorizations(userName, newAuths);
+    }
+
+    /**
+     * Convert the comma-separated string of authorizations into a list of 
authorizations.
+     * @param authString the comma-separated string of authorizations.
+     * @return a {@link List} of authorization strings.
+     */
+    public static List<String> convertAuthStringToList(final String 
authString) {
+        final List<String> authList = new ArrayList<>();
+        if (authString != null) {
+            final String[] authSplit = authString.split(",");
+            authList.addAll(Arrays.asList(authSplit));
+        }
+        return authList;
+    }
+
+    /**
+     * Sets up a {@link Connector} with the specified config.
+     * @param accumuloRdfConfiguration the {@link AccumuloRdfConfiguration}.
+     * @return the {@link Connector}.
+     */
+    public static Connector setupConnector(final AccumuloRdfConfiguration 
accumuloRdfConfiguration) {
+        Connector connector = null;
+        try {
+            connector = ConfigUtils.getConnector(accumuloRdfConfiguration);
+        } catch (AccumuloException | AccumuloSecurityException e) {
+            log.error("Error creating connector", e);
+        }
+
+        return connector;
+    }
+
+    /**
+     * Sets up a {@link AccumuloRyaDAO} with the specified connector.
+     * @param connector the {@link Connector}.
+     * @return the {@link AccumuloRyaDAO}.
+     */
+    public static AccumuloRyaDAO setupDao(final AccumuloRdfConfiguration 
accumuloRdfConfiguration) {
+        final Connector connector = setupConnector(accumuloRdfConfiguration);
+        return setupDao(connector, accumuloRdfConfiguration);
+    }
+
+    /**
+     * Sets up a {@link AccumuloRyaDAO} with the specified connector and 
config.
+     * @param connector the {@link Connector}.
+     * @param accumuloRdfConfiguration the {@link AccumuloRdfConfiguration}.
+     * @return the {@link AccumuloRyaDAO}.
+     */
+    public static AccumuloRyaDAO setupDao(final Connector connector, final 
AccumuloRdfConfiguration accumuloRdfConfiguration) {
+        final AccumuloRyaDAO accumuloRyaDao = new AccumuloRyaDAO();
+        accumuloRyaDao.setConnector(connector);
+        accumuloRyaDao.setConf(accumuloRdfConfiguration);
+
+        try {
+            accumuloRyaDao.init();
+        } catch (final RyaDAOException e) {
+            log.error("Error initializing DAO", e);
+        }
+
+        return accumuloRyaDao;
+    }
+}
\ No newline at end of file


Reply via email to