http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java new file mode 100644 index 0000000..ca4d0c2 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseCopyToolMapper.java @@ -0,0 +1,260 @@ +package mvm.rya.accumulo.mr.merge.mappers; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.mapreduce.lib.partition.KeyRangePartitioner; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.Mapper; + +import com.google.common.base.Joiner; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.merge.CopyTool; +import mvm.rya.accumulo.mr.merge.MergeTool; +import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaTripleContext; +import twitter4j.Logger; + +/** + * The base {@link Mapper} for the copy tool which initializes the mapper for use. The mapper will take all + * keys from the parent table that are after the provided start time and copy them to the child table. + */ +public class BaseCopyToolMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> { + private static final Logger log = Logger.getLogger(BaseCopyToolMapper.class); + + protected String startTimeString; + protected Date startTime; + protected Date runTime; + protected Long timeOffset; + protected boolean useCopyFileOutput; + + protected String parentTableName; + protected String childTableName; + protected String parentTablePrefix; + protected String childTablePrefix; + protected Text childTableNameText; + + protected Configuration parentConfig; + protected Configuration childConfig; + + protected String parentUser; + protected String childUser; + + protected Connector parentConnector; + protected Connector childConnector; + + protected AccumuloRdfConfiguration parentAccumuloRdfConfiguration; + protected AccumuloRdfConfiguration childAccumuloRdfConfiguration; + + protected RyaTripleContext childRyaContext; + + protected AccumuloRyaDAO childDao; + + /** + * Creates a new {@link BaseCopyToolMapper}. + */ + public BaseCopyToolMapper() { + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + log.info("Setting up mapper"); + + parentConfig = context.getConfiguration(); + childConfig = MergeToolMapper.getChildConfig(parentConfig); + + startTimeString = parentConfig.get(MergeTool.START_TIME_PROP, null); + if (startTimeString != null) { + startTime = MergeTool.convertStartTimeStringToDate(startTimeString); + } + + String runTimeString = parentConfig.get(CopyTool.COPY_RUN_TIME_PROP, null); + if (runTimeString != null) { + runTime = MergeTool.convertStartTimeStringToDate(runTimeString); + } + + String offsetString = parentConfig.get(CopyTool.PARENT_TIME_OFFSET_PROP, null); + if (offsetString != null) { + timeOffset = Long.valueOf(offsetString); + } + + useCopyFileOutput = parentConfig.getBoolean(CopyTool.USE_COPY_FILE_OUTPUT, false); + + parentTableName = parentConfig.get(MergeTool.TABLE_NAME_PROP, null); + parentTablePrefix = parentConfig.get(MRUtils.TABLE_PREFIX_PROPERTY, null); + childTablePrefix = childConfig.get(MRUtils.TABLE_PREFIX_PROPERTY, null); + childTableName = parentTableName.replaceFirst(parentTablePrefix, childTablePrefix); + childTableNameText = new Text(childTableName); + log.info("Copying data from parent table, \"" + parentTableName + "\", to child table, \"" + childTableName + "\""); + + parentUser = parentConfig.get(MRUtils.AC_USERNAME_PROP, null); + childUser = childConfig.get(MRUtils.AC_USERNAME_PROP, null); + + parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(parentConfig); + parentAccumuloRdfConfiguration.setTablePrefix(parentTablePrefix); + parentConnector = AccumuloRyaUtils.setupConnector(parentAccumuloRdfConfiguration); + + childAccumuloRdfConfiguration = new AccumuloRdfConfiguration(childConfig); + childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix); + childRyaContext = RyaTripleContext.getInstance(childAccumuloRdfConfiguration); + + + if (useCopyFileOutput) { + fixSplitsInCachedLocalFiles(); + } else { + childConnector = AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration); + childDao = AccumuloRyaUtils.setupDao(childConnector, childAccumuloRdfConfiguration); + + createTableIfNeeded(); + + copyAuthorizations(); + } + + // Add the run time and split time to the table + addMetadataKeys(context); + + log.info("Finished setting up mapper"); + } + + /** + * Fixes the "splits.txt" file path in the "mapreduce.job.cache.local.files" property. It contains the + * {@link URI} "file:" prefix which causes {@link KeyRangePartitioner} to throw a {@code FileNotFoundException} + * when it attempts to open it. + */ + private void fixSplitsInCachedLocalFiles() { + if (useCopyFileOutput) { + // The "mapreduce.job.cache.local.files" property contains a comma-separated + // list of cached local file paths. + String cachedLocalFiles = parentConfig.get(MRJobConfig.CACHE_LOCALFILES); + if (cachedLocalFiles != null) { + List<String> cachedLocalFilesList = Lists.newArrayList(Splitter.on(',').split(cachedLocalFiles)); + List<String> formattedCachedLocalFilesList = new ArrayList<>(); + for (String cachedLocalFile : cachedLocalFilesList) { + String pathToAdd = cachedLocalFile; + if (cachedLocalFile.endsWith("splits.txt")) { + URI uri = null; + try { + uri = new URI(cachedLocalFiles); + pathToAdd = uri.getPath(); + } catch (URISyntaxException e) { + log.error("Invalid syntax in local cache file path", e); + } + } + formattedCachedLocalFilesList.add(pathToAdd); + } + String formattedCachedLocalFiles = Joiner.on(',').join(formattedCachedLocalFilesList); + if (!cachedLocalFiles.equals(formattedCachedLocalFiles)) { + parentConfig.set(MRJobConfig.CACHE_LOCALFILES, formattedCachedLocalFiles); + } + } + } + } + + protected void addMetadataKeys(Context context) throws IOException { + try { + if (AccumuloRyaUtils.getCopyToolRunDate(childDao) == null) { + log.info("Writing copy tool run time metadata to child table: " + runTime); + AccumuloRyaUtils.setCopyToolRunDate(runTime, childDao); + } + if (AccumuloRyaUtils.getCopyToolSplitDate(childDao) == null) { + log.info("Writing copy split time metadata to child table: " + startTime); + AccumuloRyaUtils.setCopyToolSplitDate(startTime, childDao); + } + + if (timeOffset != null) { + log.info("Writing copy tool time offset metadata to child table: " + timeOffset); + AccumuloRyaUtils.setTimeOffset(timeOffset, childDao); + } + } catch (RyaDAOException e) { + throw new IOException("Failed to set time metadata key for table: " + childTableName, e); + } + } + + private void createTableIfNeeded() throws IOException { + try { + if (!childConnector.tableOperations().exists(childTableName)) { + log.info("Creating table: " + childTableName); + childConnector.tableOperations().create(childTableName); + log.info("Created table: " + childTableName); + log.info("Granting authorizations to table: " + childTableName); + childConnector.securityOperations().grantTablePermission(childUser, childTableName, TablePermission.WRITE); + log.info("Granted authorizations to table: " + childTableName); + } + } catch (TableExistsException | AccumuloException | AccumuloSecurityException e) { + throw new IOException(e); + } + } + + protected void copyAuthorizations() throws IOException { + try { + SecurityOperations parentSecOps = parentConnector.securityOperations(); + SecurityOperations childSecOps = childConnector.securityOperations(); + + Authorizations parentAuths = parentSecOps.getUserAuthorizations(parentUser); + Authorizations childAuths = childSecOps.getUserAuthorizations(childUser); + // Add any parent authorizations that the child doesn't have. + if (!childAuths.equals(parentAuths)) { + log.info("Adding the authorization, \"" + parentAuths.toString() + "\", to the child user, \"" + childUser + "\""); + Authorizations newChildAuths = AccumuloRyaUtils.addUserAuths(childUser, childSecOps, parentAuths); + childSecOps.changeUserAuthorizations(childUser, newChildAuths); + } + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException(e); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + super.cleanup(context); + log.info("Cleaning up mapper..."); + try { + if (childDao != null) { + childDao.destroy(); + } + } catch (RyaDAOException e) { + log.error("Error destroying child DAO", e); + } + log.info("Cleaned up mapper"); + } +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java new file mode 100644 index 0000000..a0abe0d --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/BaseRuleMapper.java @@ -0,0 +1,222 @@ +package mvm.rya.accumulo.mr.merge.mappers; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.List; + +import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.openrdf.query.QueryEvaluationException; +import org.openrdf.query.algebra.Or; +import org.openrdf.query.algebra.ValueExpr; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.mr.merge.MergeTool; +import mvm.rya.accumulo.mr.merge.util.AccumuloQueryRuleset; +import mvm.rya.accumulo.mr.merge.util.CopyRule; +import mvm.rya.accumulo.mr.merge.util.QueryRuleset.QueryRulesetException; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.RdfCloudTripleStoreUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaToRdfConversions; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolver; +import mvm.rya.api.resolver.triple.TripleRowResolverException; +import mvm.rya.api.resolver.triple.impl.WholeRowTripleResolver; +import mvm.rya.rdftriplestore.evaluation.ParallelEvaluationStrategyImpl; + +/** + * Take in rows from a table and range defined by query-based rules, convert the rows to + * statements based on the table name, and process those statements that match the rule(s). + */ +public abstract class BaseRuleMapper<KEYOUT, VALUEOUT> extends BaseCopyToolMapper<Key, Value, KEYOUT, VALUEOUT> { + /** + * Hadoop counters for tracking the number of statements and/or raw rows that have been processed. + */ + public static enum Counters { STATEMENTS_COPIED, DIRECT_ROWS_COPIED }; + + private static final Logger log = Logger.getLogger(BaseRuleMapper.class); + + private TripleRowResolver resolver = new WholeRowTripleResolver(); + private TABLE_LAYOUT parentLayout = null; + private ValueExpr condition; + private ParallelEvaluationStrategyImpl strategy; + private RangeInputSplit split; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + split = (RangeInputSplit) context.getInputSplit(); + Range range = split.getRange(); + + // Determine the table and table layout we're scanning + parentTableName = split.getTableName(); + parentTablePrefix = conf.get(MRUtils.TABLE_PREFIX_PROPERTY); + for (TABLE_LAYOUT layout : TABLE_LAYOUT.values()) { + String tableName = RdfCloudTripleStoreUtils.layoutPrefixToTable(layout, parentTablePrefix); + if (tableName.equals(parentTableName)) { + parentLayout = layout; + } + } + conf.set(MergeTool.TABLE_NAME_PROP, parentTableName); + + // Set up connections and parent/child table information, if necessary + super.setup(context); + + // If we're working at the statement level, get the relevant rules and conditions: + if (parentLayout != null) { + AccumuloQueryRuleset ruleset; + try { + ruleset = new AccumuloQueryRuleset(new AccumuloRdfConfiguration(conf)); + } catch (QueryRulesetException e) { + throw new IOException("Error parsing the input query", e); + } + + List<CopyRule> rules = ruleset.getRules(parentLayout, range); + + for (CopyRule rule : rules) { + log.info("Mapper applies to rule:"); + for (String line : rule.toString().split("\n")) { + log.info("\t" + line); + } + } + + // Combine the rules' conditions so that if any of the individual conditions matches, the + // composite condition will match as well. We know all the rules match all the statements + // this input split will receive, so if any condition is true we'll want to copy the statement. + for (CopyRule rule : rules) { + // Attach any relevant filter conditions given by this rule. + // If there are no conditions, all statements read by this mapper should be accepted + // (even if there are redundant rules with conditions) + if (rule.getCondition() == null) { + condition = null; + break; + } + // If there is a set of conditions, matching it means we should accept the statement. + else if (condition == null) { + condition = rule.getCondition(); + } + // If there are more than one rules that match, satisfying any conditions means we should accept. + else { + condition = new Or(condition, rule.getCondition()); + } + } + + // Set up the strategy to evaluate those conditions + strategy = new ParallelEvaluationStrategyImpl(null, null, null, childAccumuloRdfConfiguration); + + // Log info about the split and combined condition + log.info("Table: " + parentTableName); + log.info("Range:"); + log.info("\tfrom " + keyToString(range.getStartKey(), Integer.MAX_VALUE)); + log.info("\tto " + keyToString(range.getEndKey(), Integer.MAX_VALUE)); + if (condition == null) { + log.info("Condition: none"); + } + else { + log.info("Condition:"); + for (String line : condition.toString().split("\n")) { + log.info("\t" + line); + } + } + } + + else { + log.info("(Copying all rows from " + parentTableName + " directly.)"); + } + } + + @Override + protected void map(Key key, Value value, Context context) throws IOException, InterruptedException { + TripleRow row = new TripleRow(key.getRowData().toArray(), key.getColumnFamilyData().toArray(), + key.getColumnQualifierData().toArray(), key.getTimestamp(), + key.getColumnVisibilityData().toArray(), value == null ? null : value.get()); + try { + // If there's no layout, copy the row directly + if (parentLayout == null) { + copyRow(key, value, context); + context.getCounter(Counters.DIRECT_ROWS_COPIED).increment(1); + } + // If there is a layout, deserialize the statement and insert it if it meets the condition + else { + RyaStatement rs = resolver.deserialize(parentLayout, row); + if (condition == null || CopyRule.accept(RyaToRdfConversions.convertStatement(rs), condition, strategy)) { + copyStatement(rs, context); + context.getCounter(Counters.STATEMENTS_COPIED).increment(1); + } + } + } catch (TripleRowResolverException e) { + throw new IOException("Error deserializing triple", e); + } catch (QueryEvaluationException e) { + throw new IOException("Error evaluating the filter condition", e); + } + } + + /** + * Copy a Statement, serializing it and/or indexing it as necessary. + * @param rstmt RyaStatement to copy to the child + * @param context Context to use for writing + * @throws InterruptedException If the Hadoop framework reports an interrupt + * @throws IOException If any error is encountered while serializing or writing the statement + */ + abstract protected void copyStatement(RyaStatement rstmt, Context context) throws IOException, InterruptedException; + + /** + * Copy a row directly, as opposed to starting with a higher-level object and serializing it. + * @param key Row's key + * @param value Row's value + * @param context Context to use for writing + * @throws InterruptedException If the Hadoop framework reports an interrupt + * @throws IOException If an error is encountered writing the row + */ + abstract protected void copyRow(Key key, Value value, Context context) throws IOException, InterruptedException; + + /** + * Get a printable representation of a Key, with parts truncated to a parameterized length. + * (Key.toString() truncates to a fixed length that is sometimes too short to usefully log ranges). + * @param key Any Accumulo Key + * @param max The maximum printed length of each individual portion + * @return A human-readable representation of the Key + */ + private static String keyToString(Key key, int max) { + StringBuilder sb = new StringBuilder(); + byte[] row = key.getRow().copyBytes(); + byte[] colFamily = key.getColumnFamily().copyBytes(); + byte[] colQualifier = key.getColumnQualifier().copyBytes(); + byte[] colVisibility = key.getColumnVisibility().copyBytes(); + Key.appendPrintableString(row, 0, row.length, max, sb); + sb.append(" "); + Key.appendPrintableString(colFamily, 0, colFamily.length, max, sb); + sb.append(":"); + Key.appendPrintableString(colQualifier, 0, colQualifier.length, max, sb); + sb.append(" ["); + Key.appendPrintableString(colVisibility, 0, colVisibility.length, max, sb); + sb.append("] "); + sb.append(Long.toString(key.getTimestamp())); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java new file mode 100644 index 0000000..7a306f2 --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/FileCopyToolMapper.java @@ -0,0 +1,81 @@ +package mvm.rya.accumulo.mr.merge.mappers; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.log4j.Logger; + +import mvm.rya.accumulo.AccumuloRdfUtils; +import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +/** + * Extended {@link BaseCopyToolMapper} that handles the {@code AccumuloFileOutputFormat} for the copy tool. + */ +public class FileCopyToolMapper extends BaseCopyToolMapper<Key, Value, Key, Value> { + private static final Logger log = Logger.getLogger(FileCopyToolMapper.class); + + /** + * Creates a new {@link FileCopyToolMapper}. + */ + public FileCopyToolMapper() { + } + + @Override + protected void addMetadataKeys(Context context) throws IOException { + try { + if (runTime != null) { + log.info("Writing copy tool run time metadata to child table: " + runTime); + RyaStatement ryaStatement = AccumuloRyaUtils.createCopyToolRunTimeRyaStatement(runTime); + writeRyaStatement(ryaStatement, context); + } + + if (startTime != null) { + log.info("Writing copy split time metadata to child table: " + startTime); + RyaStatement ryaStatement = AccumuloRyaUtils.createCopyToolSplitTimeRyaStatement(startTime); + writeRyaStatement(ryaStatement, context); + } + + if (timeOffset != null) { + log.info("Writing copy tool time offset metadata to child table: " + timeOffset); + RyaStatement ryaStatement = AccumuloRyaUtils.createTimeOffsetRyaStatement(timeOffset); + writeRyaStatement(ryaStatement, context); + } + } catch (TripleRowResolverException | IOException | InterruptedException e) { + throw new IOException("Failed to write metadata key", e); + } + } + + private void writeRyaStatement(RyaStatement ryaStatement, Context context) throws TripleRowResolverException, IOException, InterruptedException { + Map<TABLE_LAYOUT, TripleRow> serialize = childRyaContext.getTripleResolver().serialize(ryaStatement); + TripleRow tripleRow = serialize.get(TABLE_LAYOUT.SPO); + Key key = AccumuloRdfUtils.from(tripleRow); + Value value = AccumuloRdfUtils.extractValue(tripleRow); + context.write(key, value); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/MergeToolMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/MergeToolMapper.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/MergeToolMapper.java new file mode 100644 index 0000000..0b5dddd --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/MergeToolMapper.java @@ -0,0 +1,575 @@ +package mvm.rya.accumulo.mr.merge.mappers; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; +import org.apache.accumulo.core.data.ColumnUpdate; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.ColumnVisibility; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.log4j.Logger; + +import com.google.common.base.Charsets; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRdfConstants; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.RyaTableMutationsFactory; +import mvm.rya.accumulo.mr.merge.CopyTool; +import mvm.rya.accumulo.mr.merge.MergeTool; +import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils; +import mvm.rya.accumulo.mr.merge.util.TimeUtils; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConfiguration; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +/** + * Reads from the Parent and Child tables comparing their the keys and adds or deletes the keys + * from the parent as necessary in order to reflect changes that the child made since the provided + * start time. + */ +public class MergeToolMapper extends Mapper<Key, Value, Text, Mutation> { + private static final Logger log = Logger.getLogger(MergeToolMapper.class); + + private boolean usesStartTime; + private String startTimeString; + private Date startTime; + private String parentTableName; + private String childTableName; + private String parentTablePrefix; + private String childTablePrefix; + private Text spoTable; + private Text poTable; + private Text ospTable; + + private Context context; + + private Configuration parentConfig; + private Configuration childConfig; + + private AccumuloRdfConfiguration parentAccumuloRdfConfiguration; + private AccumuloRdfConfiguration childAccumuloRdfConfiguration; + + private RyaTripleContext parentRyaContext; + private RyaTripleContext childRyaContext; + + private RyaTableMutationsFactory ryaTableMutationFactory; + + private Scanner childScanner; + private Iterator<Entry<Key, Value>> childIterator; + private Connector childConnector; + private AccumuloRyaDAO childDao; + + private Date copyToolInputTime; + private Date copyToolRunTime; + + private Long parentTimeOffset = 0L; + private Long childTimeOffset = 0L; + + private boolean useTimeSync = false; + private boolean useMergeFileInput = false; + + /** + * Creates a new {@link MergeToolMapper}. + */ + public MergeToolMapper() { + } + + /** + * The result of comparing a child key and parent key which determines what should be done with them. + */ + private static enum CompareKeysResult { + /** + * Indicates that the child iterator should move to the next key in the child + * table in order to be compared to the current key in the parent table. + */ + ADVANCE_CHILD, + /** + * Indicates that the child iterator should move to the next key in the child + * table in order to be compared to the current key in the parent table + * and that the current child key should be added to the parent. + */ + ADVANCE_CHILD_AND_ADD, + /** + * Indicates that the parent iterator should move to the next key in the parent table + * in order to be compared to the current key in the child table. + */ + ADVANCE_PARENT, + /** + * Indicates that the parent iterator should move to the next key in the parent table + * in order to be compared to the current key in the child table + * and that the current parent key should be deleted from the parent. + */ + ADVANCE_PARENT_AND_DELETE, + /** + * Indicates that the child iterator should move to the next key in the child table + * and the parent iterator should move to the next key in the parent table. + */ + ADVANCE_BOTH, + /** + * Indicates that there are no more keys to compare in the child and parent tables. + */ + FINISHED; + } + + /** + * Expert users can override this method for more complete control over + * the execution of the Mapper. + * + * @param context + * @throws IOException + */ + @Override + public void run(Context context) throws IOException, InterruptedException { + setup(context); + this.context = context; + + try { + RyaStatement parentRyaStatement = nextParentRyaStatement(); + RyaStatement childRyaStatement = nextChildRyaStatement(); + + CompareKeysResult compareKeysResult = null; + // Iteratively compare parent keys to child keys until finished + while (compareKeysResult != CompareKeysResult.FINISHED) { + compareKeysResult = compareKeys(parentRyaStatement, childRyaStatement); + + // Based on how the keys compare add or delete keys and advance the child or parent iterators forward + switch (compareKeysResult) { + case ADVANCE_CHILD: + childRyaStatement = nextChildRyaStatement(); + break; + case ADVANCE_PARENT: + parentRyaStatement = nextParentRyaStatement(); + break; + case ADVANCE_CHILD_AND_ADD: + RyaStatement tempChildRyaStatement = childRyaStatement; + childRyaStatement = nextChildRyaStatement(); + addKey(tempChildRyaStatement, context); + break; + case ADVANCE_PARENT_AND_DELETE: + RyaStatement tempParentRyaStatement = parentRyaStatement; + parentRyaStatement = nextParentRyaStatement(); + deleteKey(tempParentRyaStatement, context); + break; + case ADVANCE_BOTH: + ColumnVisibility cv1 = new ColumnVisibility(parentRyaStatement.getColumnVisibility()); + ColumnVisibility cv2 = new ColumnVisibility(childRyaStatement.getColumnVisibility()); + + // Update new column visibility now if necessary + if (!cv1.equals(cv2) && !cv2.equals(AccumuloRdfConstants.EMPTY_CV)) { + ColumnVisibility newCv = combineColumnVisibilities(cv1, cv2); + RyaStatement newCvRyaStatement = updateRyaStatementColumnVisibility(parentRyaStatement, newCv); + + deleteKey(parentRyaStatement, context); + addKey(newCvRyaStatement, context); + } + + parentRyaStatement = nextParentRyaStatement(); + childRyaStatement = nextChildRyaStatement(); + break; + case FINISHED: + log.info("Finished scanning parent and child tables"); + break; + default: + log.error("Unknown result: " + compareKeysResult); + break; + } + } + } catch (MutationsRejectedException | TripleRowResolverException e) { + log.error("Error encountered while merging", e); + } finally { + cleanup(context); + } + } + + private RyaStatement nextParentRyaStatement() throws IOException, InterruptedException { + return nextRyaStatement(context, parentRyaContext); + } + + private RyaStatement nextChildRyaStatement() throws IOException, InterruptedException { + return nextRyaStatement(childIterator, childRyaContext); + } + + private static RyaStatement nextRyaStatement(Iterator<Entry<Key, Value>> iterator, RyaTripleContext ryaContext) { + RyaStatement ryaStatement = null; + if (iterator.hasNext()) { + Entry<Key, Value> entry = iterator.next(); + Key key = entry.getKey(); + Value value = entry.getValue(); + try { + ryaStatement = createRyaStatement(key, value, ryaContext); + } catch (TripleRowResolverException e) { + log.error("TripleRowResolverException encountered while creating statement", e); + } + } + return ryaStatement; + } + + private static RyaStatement nextRyaStatement(Context context, RyaTripleContext ryaContext) throws IOException, InterruptedException { + RyaStatement ryaStatement = null; + if (context.nextKeyValue()) { + Key key = context.getCurrentKey(); + Value value = context.getCurrentValue(); + try { + ryaStatement = createRyaStatement(key, value, ryaContext); + } catch (TripleRowResolverException e) { + log.error("TripleRowResolverException encountered while creating statement", e); + } + } + return ryaStatement; + } + + private static RyaStatement createRyaStatement(Key key, Value value, RyaTripleContext ryaTripleContext) throws TripleRowResolverException { + byte[] row = key.getRowData() != null && key.getRowData().toArray().length > 0 ? key.getRowData().toArray() : null; + byte[] columnFamily = key.getColumnFamilyData() != null && key.getColumnFamilyData().toArray().length > 0 ? key.getColumnFamilyData().toArray() : null; + byte[] columnQualifier = key.getColumnQualifierData() != null && key.getColumnQualifierData().toArray().length > 0 ? key.getColumnQualifierData().toArray() : null; + Long timestamp = key.getTimestamp(); + byte[] columnVisibility = key.getColumnVisibilityData() != null && key.getColumnVisibilityData().toArray().length > 0 ? key.getColumnVisibilityData().toArray() : null; + byte[] valueBytes = value != null && value.get().length > 0 ? value.get() : null; + TripleRow tripleRow = new TripleRow(row, columnFamily, columnQualifier, timestamp, columnVisibility, valueBytes); + RyaStatement ryaStatement = ryaTripleContext.deserializeTriple(TABLE_LAYOUT.SPO, tripleRow); + + return ryaStatement; + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + log.info("Setting up mapper"); + + parentConfig = context.getConfiguration(); + childConfig = getChildConfig(parentConfig); + + startTimeString = parentConfig.get(MergeTool.START_TIME_PROP, null); + if (startTimeString != null) { + startTime = MergeTool.convertStartTimeStringToDate(startTimeString); + } + usesStartTime = startTime != null; + useTimeSync = parentConfig.getBoolean(CopyTool.USE_NTP_SERVER_PROP, false); + useMergeFileInput = parentConfig.getBoolean(MergeTool.USE_MERGE_FILE_INPUT, false); + parentTableName = parentConfig.get(MergeTool.TABLE_NAME_PROP, null); + parentTablePrefix = parentConfig.get(MRUtils.TABLE_PREFIX_PROPERTY, null); + childTablePrefix = childConfig.get(MRUtils.TABLE_PREFIX_PROPERTY, null); + if (useMergeFileInput) { + childTableName = parentTableName.replaceFirst(parentTablePrefix, childTablePrefix) + MergeTool.TEMP_SUFFIX; + } else { + childTableName = parentTableName.replaceFirst(parentTablePrefix, childTablePrefix); + } + spoTable = new Text(parentTablePrefix + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX); + poTable = new Text(parentTablePrefix + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX); + ospTable = new Text(parentTablePrefix + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX); + + childScanner = setupChildScanner(context); + childIterator = childScanner.iterator(); + + parentAccumuloRdfConfiguration = new AccumuloRdfConfiguration(parentConfig); + parentAccumuloRdfConfiguration.setTablePrefix(parentTablePrefix); + parentRyaContext = RyaTripleContext.getInstance(parentAccumuloRdfConfiguration); + + ryaTableMutationFactory = new RyaTableMutationsFactory(parentRyaContext); + + childAccumuloRdfConfiguration = new AccumuloRdfConfiguration(childConfig); + childAccumuloRdfConfiguration.setTablePrefix(childTablePrefix); + childRyaContext = RyaTripleContext.getInstance(childAccumuloRdfConfiguration); + childConnector = AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration); + + childDao = AccumuloRyaUtils.setupDao(childConnector, childAccumuloRdfConfiguration); + + if (startTime != null && useTimeSync) { + try { + copyToolInputTime = AccumuloRyaUtils.getCopyToolSplitDate(childDao); + copyToolRunTime = AccumuloRyaUtils.getCopyToolRunDate(childDao); + + // Find the parent's time offset that was stored when the child was copied. + parentTimeOffset = AccumuloRyaUtils.getTimeOffset(childDao); + String durationBreakdown = TimeUtils.getDurationBreakdown(parentTimeOffset); + log.info("The table " + parentTableName + " has a time offset of: " + durationBreakdown); + childTimeOffset = Long.valueOf(childConfig.get(CopyTool.CHILD_TIME_OFFSET_PROP, null)); + Date adjustedParentStartTime = new Date(startTime.getTime() - parentTimeOffset); + Date adjustedChildStartTime = new Date(startTime.getTime() - childTimeOffset); + log.info("Adjusted parent start time: " + adjustedParentStartTime); + log.info("Adjusted child start time: " + adjustedChildStartTime); + } catch (RyaDAOException e) { + log.error("Error getting time offset", e); + } + } + + log.info("Finished setting up mapper"); + } + + /** + * Takes the child instance values in the configuration and puts into their corresponding parent instance values + * so the config will connect to the child instance. + * @param parentConfig the {@link Configuration} containing the parent and child properties. + * @return the new {@link Configuration} where the parent connection values are replaced with + * the child connection values. + */ + public static Configuration getChildConfig(Configuration parentConfig) { + Configuration childConfig = new Configuration(parentConfig); + + // Switch the temp child properties to be the main ones + convertChildPropToParentProp(childConfig, parentConfig, MRUtils.AC_MOCK_PROP); + convertChildPropToParentProp(childConfig, parentConfig, MRUtils.AC_INSTANCE_PROP); + convertChildPropToParentProp(childConfig, parentConfig, MRUtils.AC_USERNAME_PROP); + convertChildPropToParentProp(childConfig, parentConfig, MRUtils.AC_PWD_PROP); + convertChildPropToParentProp(childConfig, parentConfig, MRUtils.TABLE_PREFIX_PROPERTY); + convertChildPropToParentProp(childConfig, parentConfig, MRUtils.AC_AUTH_PROP); + convertChildPropToParentProp(childConfig, parentConfig, RdfCloudTripleStoreConfiguration.CONF_QUERY_AUTH); + convertChildPropToParentProp(childConfig, parentConfig, MRUtils.AC_ZK_PROP); + + MergeTool.setDuplicateKeys(childConfig); + + return childConfig; + } + + /** + * Looks for all properties in the parent/main configuration marked as a child value (by being appended with ".child") + * and converts it in an unmarked property for the child config to use. + * @param childConfig the child {@link Configuration}. + * @param parentConfig the parent/main {@link Configuration}. + * @param parentPropertyName the parent property name. + */ + public static void convertChildPropToParentProp(Configuration childConfig, Configuration parentConfig, String parentPropertyName) { + String childValue = parentConfig.get(parentPropertyName + MergeTool.CHILD_SUFFIX, ""); + childConfig.set(parentPropertyName, childValue); + } + + /** + * Combines 2 {@link ColumnVisibility ColumnVisibilities} by OR'ing them together. + * @param cv1 the first (parent) {@link ColumnVisibility}. + * @param cv2 the second (child) {@link ColumnVisibility}. + * @return the newly combined {@link ColumnVisibility}. + */ + public static ColumnVisibility combineColumnVisibilities(ColumnVisibility cv1, ColumnVisibility cv2) { + // OR the 2 column visibilities together if they're different + String columnVisibilityExpression; + if (cv1.equals(AccumuloRdfConstants.EMPTY_CV)) { + columnVisibilityExpression = new String(cv2.getExpression(), Charsets.UTF_8); + } else { + columnVisibilityExpression = "(" + new String(cv1.getExpression(), Charsets.UTF_8) + ")|(" + + new String(cv2.getExpression(), Charsets.UTF_8) + ")"; + } + ColumnVisibility newCv = new ColumnVisibility(new Text(columnVisibilityExpression)); + newCv = new ColumnVisibility(newCv.flatten()); + return newCv; + } + + private Scanner setupChildScanner(Context context) throws IOException { + return setupScanner(context, childTableName, childConfig); + } + + private static Scanner setupScanner(Context context, String tableName, Configuration config) throws IOException { + RangeInputSplit split = (RangeInputSplit) context.getInputSplit(); + Range splitRange = split.getRange(); + Scanner scanner = AccumuloRyaUtils.getScanner(tableName, config); + scanner.setRange(splitRange); + + return scanner; + } + + private void writeRyaMutations(RyaStatement ryaStatement, Context context, boolean isDelete) throws IOException, InterruptedException { + if (ryaStatement.getColumnVisibility() == null) { + ryaStatement.setColumnVisibility(AccumuloRdfConstants.EMPTY_CV.getExpression()); + } + + Map<TABLE_LAYOUT, Collection<Mutation>> mutationMap = ryaTableMutationFactory.serialize(ryaStatement); + Collection<Mutation> spoMutations = mutationMap.get(TABLE_LAYOUT.SPO); + Collection<Mutation> poMutations = mutationMap.get(TABLE_LAYOUT.PO); + Collection<Mutation> ospMutations = mutationMap.get(TABLE_LAYOUT.OSP); + + for (Mutation mutation : spoMutations) { + writeMutation(spoTable, mutation, context, isDelete); + } + for (Mutation mutation : poMutations) { + writeMutation(poTable, mutation, context, isDelete); + } + for (Mutation mutation : ospMutations) { + writeMutation(ospTable, mutation, context, isDelete); + } + } + + private void addKey(RyaStatement ryaStatement, Context context) throws IOException, InterruptedException { + writeRyaMutations(ryaStatement, context, false); + } + + private void deleteKey(RyaStatement ryaStatement, Context context) throws IOException, InterruptedException { + writeRyaMutations(ryaStatement, context, true); + } + + /** + * Writes a mutation to the specified table. If the mutation is meant to delete then the mutation will + * be transformed to a delete mutation. + * @param table the table to write to. + * @param mutation the {@link mutation}. + * @param context the {@link Context}. + * @param isDelete {@code true} if the mutation should be a delete mutation. {@code false} otherwise. + * @throws IOException + * @throws InterruptedException + */ + private static void writeMutation(Text table, Mutation mutation, Context context, boolean isDelete) throws IOException, InterruptedException { + if (isDelete) { + List<ColumnUpdate> updates = mutation.getUpdates(); + ColumnUpdate columnUpdate = updates.get(0); + ColumnVisibility cv = columnUpdate.getColumnVisibility() != null ? new ColumnVisibility(columnUpdate.getColumnVisibility()) : null; + Mutation deleteMutation = new Mutation(new Text(mutation.getRow())); + deleteMutation.putDelete(columnUpdate.getColumnFamily(), columnUpdate.getColumnQualifier(), cv, columnUpdate.getTimestamp()); + context.write(table, deleteMutation); + } else { + context.write(table, mutation); + } + } + + /** + * Adjusts the date of a key's timestamp to account for the instance's machine local time offset. + * @param date the timestamp {@link Date} to adjust. + * @param isParentTable {@code true} if the timestamp is from a key in one of the parent instance's tables. + * {@code false} if it's from the child instance. + * @return the normalized {@link Date} or the same date if nothing needed to be adjusted. + */ + private Date normalizeDate(Date date, boolean isParentTable) { + Date normalizedDate = date; + if (useTimeSync) { + if (isParentTable) { + normalizedDate = new Date(date.getTime() - parentTimeOffset); + } else { + // If the timestamp is before the time the child table was copied from + // the parent then the timestamp originated from the parent machine + if (TimeUtils.dateBeforeInclusive(date, copyToolRunTime)) { + normalizedDate = new Date(date.getTime() - parentTimeOffset); + } else { + // Timestamps after the copy time originated from the child machine. + normalizedDate = new Date(date.getTime() - childTimeOffset); + } + } + } + return normalizedDate; + } + + /** + * Since both Scanners will return sorted data, if the two key-values are + * equal, then both Scanners can advance to the next comparison. If the Key + * from Scanner1 sorts before the Key from Scanner2, then that Key doesn't + * exist in the table from Scanner2 which means Scanner1 should advance. If + * the Key from Scanner2 sorts before the Key from Scanner1, then that Key + * doesn't exist in the table from Scanner1 which means Scanner2 should + * advance. + * @param key1 the {@link RyaStatement} from the parent instance table. + * @param key2 the {@link RyaStatement} from the child instance table. + * @return the {@link CompareKeysResult}. + * @throws MutationsRejectedException + * @throws IOException + * @throws InterruptedException + * @throws TripleRowResolverException + */ + private CompareKeysResult compareKeys(RyaStatement key1, RyaStatement key2) throws MutationsRejectedException, IOException, InterruptedException, TripleRowResolverException { + log.trace("key1 = " + key1); + log.trace("key2 = " + key2); + if (key1 == null && key2 == null) { + // Reached the end of the parent and child table. + return CompareKeysResult.FINISHED; + } else if (key1 == null) { + // Reached the end of the parent table so add the remaining child keys if they meet the time criteria. + Date t2 = normalizeDate(new Date(key2.getTimestamp()), false); + // Move on to next comparison (do nothing) or add this child key to parent + boolean doNothing = usesStartTime && t2.before(startTime); + return doNothing ? CompareKeysResult.ADVANCE_CHILD : CompareKeysResult.ADVANCE_CHILD_AND_ADD; + } else if (key2 == null) { + // Reached the end of the child table so delete the remaining parent keys if they meet the time criteria. + Date t1 = normalizeDate(new Date(key1.getTimestamp()), true); + // Move on to next comparison (do nothing) or delete this key from parent + boolean doNothing = usesStartTime && (copyToolInputTime != null && (t1.before(copyToolInputTime) || (t1.after(copyToolInputTime) && t1.after(startTime))) || (copyToolInputTime == null && t1.after(startTime))); + return doNothing ? CompareKeysResult.ADVANCE_PARENT : CompareKeysResult.ADVANCE_PARENT_AND_DELETE; + } else { + // There are 2 keys to compare + Map<TABLE_LAYOUT, TripleRow> map1 = parentRyaContext.serializeTriple(key1); + Text row1 = new Text(map1.get(TABLE_LAYOUT.SPO).getRow()); + Map<TABLE_LAYOUT, TripleRow> map2 = childRyaContext.serializeTriple(key2); + Text row2 = new Text(map2.get(TABLE_LAYOUT.SPO).getRow()); + Date t1 = normalizeDate(new Date(key1.getTimestamp()), true); + Date t2 = normalizeDate(new Date(key2.getTimestamp()), false); + + if (row1.compareTo(row2) < 0) { + // Parent key sort order was before the child key sort order + // so it doesn't exist in the child table. + // What does this mean? Was it added by the parent after the child was cloned? (Meaning we should leave it) + // Or did the child delete it after it was cloned? (Meaning we should delete it) + boolean doNothing = usesStartTime && (copyToolInputTime != null && (t1.before(copyToolInputTime) || (t1.after(copyToolInputTime) && t1.after(startTime))) || (copyToolInputTime == null && t1.after(startTime))); + return doNothing ? CompareKeysResult.ADVANCE_PARENT : CompareKeysResult.ADVANCE_PARENT_AND_DELETE; + } else if (row1.compareTo(row2) > 0) { + // Parent key sort order was after the child key sort order + // so it doesn't exist in the parent table. + // What does this mean? Was it deleted by the parent after the child was cloned? (Meaning we should leave it) + // Or did the child add it after it was cloned? (Meaning we should add it) + boolean doNothing = usesStartTime && t2.before(startTime); + return doNothing ? CompareKeysResult.ADVANCE_CHILD : CompareKeysResult.ADVANCE_CHILD_AND_ADD; + } else { + // Rows are the same. So just check if column visibility needs to be updated and + // move on to the next parent and child keys. + return CompareKeysResult.ADVANCE_BOTH; + } + } + } + + private static RyaStatement updateRyaStatementColumnVisibility(RyaStatement ryaStatement, ColumnVisibility newCv) { + RyaStatement newCvRyaStatement = new RyaStatement(ryaStatement.getSubject(), ryaStatement.getPredicate(), ryaStatement.getObject(), ryaStatement.getContext(), ryaStatement.getQualifer(), newCv.getExpression(), ryaStatement.getValue(), ryaStatement.getTimestamp()); + return newCvRyaStatement; + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + super.cleanup(context); + log.info("Cleaning up mapper..."); + if (childScanner != null) { + childScanner.close(); + } + try { + if (childDao != null) { + childDao.destroy(); + } + } catch (RyaDAOException e) { + log.error("Error destroying child DAO", e); + } + log.info("Cleaned up mapper"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/RowRuleMapper.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/RowRuleMapper.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/RowRuleMapper.java new file mode 100644 index 0000000..76b24be --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/mappers/RowRuleMapper.java @@ -0,0 +1,200 @@ +package mvm.rya.accumulo.mr.merge.mappers; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; +import java.util.Map; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.admin.TableOperations; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.log4j.Logger; + +import mvm.rya.accumulo.mr.merge.MergeTool; +import mvm.rya.accumulo.mr.merge.util.AccumuloRyaUtils; +import mvm.rya.accumulo.mr.merge.util.GroupedRow; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.persist.RyaDAOException; + +/** + * Rule mapper that outputs statements as serialized Accumulo rows along with their destination + * table names, using {@link GroupedRow} as a composite Writable to represent an output + * of <(table, Key), (Key, Value)>. + * + * In order to ensure proper indexing, the statements are first + * inserted into a mock Rya instance in memory, and the tables that make up that instance are + * periodically flushed to the output. + */ +public class RowRuleMapper extends BaseRuleMapper<GroupedRow, GroupedRow> { + /** + * The number of statements each mapper will hold in memory before flushing the serialized rows to the output. + */ + public static final String MAX_STATEMENTS_PROP = "ac.copy.cache.statements.max"; + + /** + * The default number of statements each mapper will hold in memory before flushing the serialized rows to the output. + */ + public static final int MAX_STATEMENTS_DEFAULT = 10000; + + private static final Logger log = Logger.getLogger(RowRuleMapper.class); + private GroupedRow compositeKey = new GroupedRow(); + private GroupedRow compositeVal = new GroupedRow(); + private int cachedStatements = 0; + private int maxStatements; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + // Set up a mock child DAO for caching serialized statements + childAccumuloRdfConfiguration.setBoolean(MRUtils.AC_MOCK_PROP, true); + childUser = "root"; + childAccumuloRdfConfiguration.set(MRUtils.AC_USERNAME_PROP, childUser); + childAccumuloRdfConfiguration.set(MRUtils.AC_PWD_PROP, ""); + MergeTool.setDuplicateKeys(childAccumuloRdfConfiguration); + childConnector = AccumuloRyaUtils.setupConnector(childAccumuloRdfConfiguration); + childDao = AccumuloRyaUtils.setupDao(childConnector, childAccumuloRdfConfiguration); + copyAuthorizations(); + addMetadataKeys(context); // call in super.setup() does nothing, has to be done after DAO is initialized + // Determine the size of the cache + maxStatements = childAccumuloRdfConfiguration.getInt(MAX_STATEMENTS_PROP, MAX_STATEMENTS_DEFAULT); + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + flush(context); + super.cleanup(context); + } + + /** + * Add a statement to an in-memory Accumulo instance, serialized as spo/pos/osp and any applicable secondary + * indexes, and flush the in-memory rows through to the output if enough statements have been cached. + * @param rstmt RyaStatement to copy to the child + * @param context Context to use for writing + * @throws InterruptedException if Hadoop is interrupted while writing output. + * @throws IOException if an error is encountered serializing and storing the statement in memory, or + * if Hadoop reports an error writing the in-memory tables to the output. + */ + @Override + protected void copyStatement(RyaStatement rstmt, Context context) throws IOException, InterruptedException { + try { + childDao.add(rstmt); + cachedStatements++; + } catch (RyaDAOException e) { + throw new IOException("Error serializing RyaStatement", e); + } + if (cachedStatements >= maxStatements) { + flush(context); + } + } + + /** + * Copy a row directly -- just pass it through to the map output. + * @param Key Row's key + * @param Value Row's value + * @param context Context to use for writing + * @throws InterruptedException If Hadoop is interrupted during map output + * @throws IOException If Hadoop reports an error writing the output + */ + @Override + protected void copyRow(Key key, Value value, Context context) throws IOException, InterruptedException { + compositeKey.setGroup(childTableName); + compositeKey.setKey(key); + compositeVal.setKey(key); + compositeVal.setValue(value); + context.write(compositeKey, compositeVal); + } + + /** + * Insert copy tool metadata, if the in-memory instance has been configured. + */ + @Override + protected void addMetadataKeys(Context context) throws IOException { + try { + if (childDao != null && childDao.isInitialized()) { + if (runTime != null) { + RyaStatement ryaStatement = AccumuloRyaUtils.createCopyToolRunTimeRyaStatement(runTime); + copyStatement(ryaStatement, context); + } + if (startTime != null) { + RyaStatement ryaStatement = AccumuloRyaUtils.createCopyToolSplitTimeRyaStatement(startTime); + copyStatement(ryaStatement, context); + } + if (timeOffset != null) { + RyaStatement ryaStatement = AccumuloRyaUtils.createTimeOffsetRyaStatement(timeOffset); + copyStatement(ryaStatement, context); + } + } + } catch (RyaDAOException | IOException | InterruptedException e) { + throw new IOException("Failed to write metadata key", e); + } + } + + private void flush(Context context) throws IOException, InterruptedException { + try { + childDao.flush(); + } catch (RyaDAOException e) { + throw new IOException("Error writing to in-memory table", e); + } + TableOperations ops = childConnector.tableOperations(); + SecurityOperations secOps = childConnector.securityOperations(); + Authorizations childAuths; + try { + childAuths = secOps.getUserAuthorizations(childUser); + } catch (AccumuloException | AccumuloSecurityException e) { + throw new IOException("Error connecting to mock instance", e); + } + for (String table : ops.list()) { + // Only copy Rya tables (skip system tables) + if (!table.startsWith(childTablePrefix)) { + continue; + } + compositeKey.setGroup(table); + try { + // Output every row in this mock table + int rows = 0; + Scanner scanner = childDao.getConnector().createScanner(table, childAuths); + for (Map.Entry<Key, Value> row : scanner) { + compositeKey.setKey(row.getKey()); + compositeVal.setKey(row.getKey()); + compositeVal.setValue(row.getValue()); + context.write(compositeKey, compositeVal); + rows++; + } + log.info("Flushed " + rows + " in-memory rows to output (" + table + ")."); + // Then clear the table + if (rows > 0) { + ops.deleteRows(table, null, null); + } + } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { + throw new IOException("Error flushing in-memory table", e); + } + } + // All tables should be empty + cachedStatements = 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/reducers/MultipleFileReducer.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/reducers/MultipleFileReducer.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/reducers/MultipleFileReducer.java new file mode 100644 index 0000000..c526e6d --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/reducers/MultipleFileReducer.java @@ -0,0 +1,64 @@ +package mvm.rya.accumulo.mr.merge.reducers; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.IOException; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; + +import mvm.rya.accumulo.mr.merge.util.GroupedRow; + +/** + * Outputs rows to different files according to their associated group names, for use with {@link AccumuloFileOutputFormat}. + */ +public class MultipleFileReducer extends Reducer<GroupedRow, GroupedRow, Key, Value> { + private MultipleOutputs<Key, Value> mos; + + @Override + protected void setup(Context context) { + mos = new MultipleOutputs<>(context); + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + if (mos != null) { + mos.close(); + } + } + + /** + * Writes <{@link Key}, {@link Value}> pairs to a file, where the path to the output file is determined by the group. + * @param group Contains the group name (a String) which is used to route output to the appropriate subdirectory + * @param rows Contain the actual Accumulo rows to be written + * @param context Context for writing + */ + @Override + protected void reduce(GroupedRow group, Iterable<GroupedRow> rows, Context context) throws IOException, InterruptedException { + String groupName = group.getGroup().toString(); + String destination = groupName + "/files/part"; + for (GroupedRow row : rows) { + mos.write(row.getKey(), row.getValue(), destination); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-rya/blob/71b5b1c9/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloInstanceDriver.java ---------------------------------------------------------------------- diff --git a/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloInstanceDriver.java b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloInstanceDriver.java new file mode 100644 index 0000000..bc7abdc --- /dev/null +++ b/extras/rya.merger/src/main/java/mvm/rya/accumulo/mr/merge/util/AccumuloInstanceDriver.java @@ -0,0 +1,575 @@ +package mvm.rya.accumulo.mr.merge.util; + +/* + * #%L + * mvm.rya.accumulo.mr.merge + * %% + * Copyright (C) 2014 Rya + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.ZooKeeperInstance; +import org.apache.accumulo.core.client.admin.SecurityOperations; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.security.TablePermission; +import org.apache.accumulo.minicluster.MiniAccumuloCluster; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.SystemUtils; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.io.Files; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.mr.merge.MergeTool; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.persist.RyaDAOException; +import mvm.rya.indexing.accumulo.ConfigUtils; +import twitter4j.Logger; + +/** + * Handles running a single {@link MiniAccumuloCluster} or a single {@link MockInstance} for an instance. + */ +public class AccumuloInstanceDriver { + private static final Logger log = Logger.getLogger(AccumuloInstanceDriver.class); + + private static final boolean IS_COPY_HADOOP_HOME_ENABLED = true; + + public static final String ROOT_USER_NAME = "root"; + + private String driverName; + private boolean isMock; + private boolean shouldCreateIndices; + private boolean isReadOnly; + private boolean isParent; + + private String user; + private String password; + private String instanceName; + private String tablePrefix; + private String auth; + + private Connector connector; + + private AccumuloRyaDAO dao; + + private SecurityOperations secOps; + + private AccumuloRdfConfiguration config = new AccumuloRdfConfiguration(); + + private MiniAccumuloCluster miniAccumuloCluster = null; + + private MockInstance mockInstance = null; + + private ZooKeeperInstance zooKeeperInstance = null; + + private Instance instance = null; + + private String zooKeepers; + + private Map<String, String> configMap = new LinkedHashMap<>(); + + private List<String> indices = null; + + private List<String> tableList = new ArrayList<>(); + + private File tempDir = null; + + public static final List<String> TABLE_NAME_SUFFIXES = + ImmutableList.<String>of( + RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX, + RdfCloudTripleStoreConstants.TBL_PO_SUFFIX, + RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX, + RdfCloudTripleStoreConstants.TBL_NS_SUFFIX, + RdfCloudTripleStoreConstants.TBL_EVAL_SUFFIX, + RdfCloudTripleStoreConstants.TBL_STATS_SUFFIX, + RdfCloudTripleStoreConstants.TBL_SEL_SUFFIX + ); + + /** + * Creates a new instance of {@link AccumuloInstanceDriver}. + * @param driverName the name used to identify this driver in the logs. (not {@code null}) + * @param isMock {@code true} if the instance will use {@link MockInstance}s. + * {@code false} if the instance will use {@link MiniAccumuloCluster}s. + * @param shouldCreateIndices {@code true} to create all the indices associated with a Rya deployment. + * {@code false} otherwise. + * @param isReadOnly {@code true} if all the tables in the instance should have their + * table permissions set to read only. {@code false} if the table permission are set to write. + * @param isParent {@code true} if the instance is the parent/main instance. {@code false} if it's the + * child. + * @param user the user name tied to this instance. + * @param password the password for the user. + * @param instanceName the name of the instance. + * @param tablePrefix the table prefix. + * @param auth the comma-separated authorization list. + */ + public AccumuloInstanceDriver(String driverName, boolean isMock, boolean shouldCreateIndices, boolean isReadOnly, boolean isParent, String user, String password, String instanceName, String tablePrefix, String auth) { + this.driverName = Preconditions.checkNotNull(driverName); + this.isMock = isMock; + this.shouldCreateIndices = shouldCreateIndices; + this.isReadOnly = isReadOnly; + this.user = user; + this.password = password; + this.instanceName = instanceName; + this.tablePrefix = tablePrefix; + this.auth = auth; + this.isParent = isParent; + + config.setTablePrefix(tablePrefix); + } + + /** + * Sets up the {@link AccumuloInstanceDriver}. + * @throws Exception + */ + public void setUp() throws Exception { + setUpInstance(); + setUpTables(); + setUpDao(); + setUpConfig(); + } + + /** + * Sets up the {@link MiniAccumuloCluster} or the {@link MockInstance}. + * @throws Exception + */ + public void setUpInstance() throws Exception { + if (!isMock) { + log.info("Setting up " + driverName + " MiniAccumulo cluster..."); + // Create and Run MiniAccumulo Cluster + tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + miniAccumuloCluster = new MiniAccumuloCluster(tempDir, password); + copyHadoopHomeToTemp(); + miniAccumuloCluster.getConfig().setInstanceName(instanceName); + log.info(driverName + " MiniAccumulo instance starting up..."); + miniAccumuloCluster.start(); + Thread.sleep(1000); + log.info(driverName + " MiniAccumulo instance started"); + log.info("Creating connector to " + driverName + " MiniAccumulo instance..."); + zooKeeperInstance = new ZooKeeperInstance(miniAccumuloCluster.getClientConfig()); + instance = zooKeeperInstance; + connector = zooKeeperInstance.getConnector(user, new PasswordToken(password)); + log.info("Created connector to " + driverName + " MiniAccumulo instance"); + } else { + log.info("Setting up " + driverName + " mock instance..."); + mockInstance = new MockInstance(instanceName); + instance = mockInstance; + connector = mockInstance.getConnector(user, new PasswordToken(password)); + log.info("Created connector to " + driverName + " mock instance"); + } + zooKeepers = instance.getZooKeepers(); + } + + /** + * Copies the HADOOP_HOME bin directory to the {@link MiniAccumuloCluster} temp directory. + * {@link MiniAccumuloCluster} expects to find bin/winutils.exe in the MAC temp + * directory instead of HADOOP_HOME for some reason. + * @throws IOException + */ + private void copyHadoopHomeToTemp() throws IOException { + if (IS_COPY_HADOOP_HOME_ENABLED && SystemUtils.IS_OS_WINDOWS) { + String hadoopHomeEnv = System.getenv("HADOOP_HOME"); + if (hadoopHomeEnv != null) { + File hadoopHomeDir = new File(hadoopHomeEnv); + if (hadoopHomeDir.exists()) { + File binDir = Paths.get(hadoopHomeDir.getAbsolutePath(), "/bin").toFile(); + if (binDir.exists()) { + FileUtils.copyDirectoryToDirectory(binDir, tempDir); + } else { + log.warn("The specified path for the Hadoop bin directory does not exist: " + binDir.getAbsolutePath()); + } + } else { + log.warn("The specified path for HADOOP_HOME does not exist: " + hadoopHomeDir.getAbsolutePath()); + } + } else { + log.warn("The HADOOP_HOME environment variable was not found."); + } + } + } + + /** + * Sets up all the tables and indices. + * @throws Exception + */ + public void setUpTables() throws Exception { + // Setup tables and permissions + log.info("Setting up " + driverName + " tables and permissions"); + for (String tableSuffix : TABLE_NAME_SUFFIXES) { + String tableName = tablePrefix + tableSuffix; + tableList.add(tableName); + if (!connector.tableOperations().exists(tableName)) { + connector.tableOperations().create(tableName); + } + } + + if (shouldCreateIndices) { + indices = Arrays.asList( + ConfigUtils.getFreeTextDocTablename(config), + ConfigUtils.getFreeTextTermTablename(config), + ConfigUtils.getGeoTablename(config), + ConfigUtils.getTemporalTableName(config), + ConfigUtils.getEntityTableName(config) + ); + + tableList.addAll(indices); + + log.info("Setting up " + driverName + " indices"); + for (String index : indices) { + if (!connector.tableOperations().exists(index)) { + connector.tableOperations().create(index); + } + } + } + + // Setup user with authorizations + log.info("Creating " + driverName + " user and authorizations"); + secOps = connector.securityOperations(); + if (!user.equals(ROOT_USER_NAME)) { + secOps.createLocalUser(user, new PasswordToken(password)); + } + addAuths(auth); + TablePermission tablePermission = isReadOnly ? TablePermission.READ : TablePermission.WRITE; + for (String tableSuffix : TABLE_NAME_SUFFIXES) { + secOps.grantTablePermission(user, tablePrefix + tableSuffix, tablePermission); + } + if (shouldCreateIndices) { + for (String index : indices) { + secOps.grantTablePermission(user, index, tablePermission); + } + } + } + + /** + * Sets up the {@link AccumuloRyaDAO}. + * @throws Exception + */ + public void setUpDao() throws Exception { + // Setup dao + log.info("Creating " + driverName + " DAO"); + dao = new AccumuloRyaDAO(); + dao.setConnector(connector); + dao.setConf(config); + + // Flush the tables before initializing the DAO + for (String tableName : tableList) { + connector.tableOperations().flush(tableName, null, null, false); + } + + dao.init(); + } + + /** + * Sets up the configuration and prints the arguments. + */ + public void setUpConfig() { + log.info("Setting " + driverName + " config"); + + // Setup config + if (isMock) { + configMap.put(MRUtils.AC_MOCK_PROP, Boolean.TRUE.toString()); + } + configMap.put(MRUtils.AC_INSTANCE_PROP, instanceName); + configMap.put(MRUtils.AC_USERNAME_PROP, user); + configMap.put(MRUtils.AC_PWD_PROP, password); + configMap.put(MRUtils.TABLE_PREFIX_PROPERTY, tablePrefix); + configMap.put(MRUtils.AC_AUTH_PROP, auth); + configMap.put(MRUtils.AC_ZK_PROP, zooKeepers != null ? zooKeepers : "localhost"); + + log.info(driverName + " config properties"); + config.setTablePrefix(tablePrefix); + for (Entry<String, String> entry : configMap.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + String argument = ToolConfigUtils.makeArgument(isParent ? key : key + MergeTool.CHILD_SUFFIX, value); + log.info(argument); + config.set(key, value); + } + + MergeTool.setDuplicateKeys(config); + } + + /** + * Tears down all the tables and indices. + * @throws Exception + */ + public void tearDownTables() throws Exception { + // delete all tables. + if (connector != null) { + for (String tableName : tableList) { + if (connector.tableOperations().exists(tableName)) { + connector.tableOperations().delete(tableName); + } + } + } + } + + /** + * Tears down the {@link AccumuloRyaDAO}. + * @throws Exception + */ + public void tearDownDao() throws Exception { + if (dao != null) { + log.info("Stopping " + driverName + " DAO"); + try { + dao.destroy(); + } catch (RyaDAOException e) { + log.error("Error stopping " + driverName + " DAO", e); + } + dao = null; + } + } + + /** + * Tears down the instance. + * @throws Exception + */ + public void tearDownInstance() throws Exception { + if (miniAccumuloCluster != null) { + log.info("Stopping " + driverName + " cluster"); + try { + miniAccumuloCluster.stop(); + } catch (IOException | InterruptedException e) { + log.error("Error stopping " + driverName + " cluster", e); + } + miniAccumuloCluster = null; + } + } + + /** + * Tears down the {@link AccumuloInstanceDriver}. + * @throws Exception + */ + public void tearDown() throws Exception { + try { + //tearDownTables(); + tearDownDao(); + tearDownInstance(); + } finally { + removeTempDir(); + } + } + + /** + * Deletes the {@link MiniAccumuloCluster} temporary directory. + */ + public void removeTempDir() { + if (tempDir != null) { + try { + FileUtils.deleteDirectory(tempDir); + } catch (IOException e) { + log.error("Error deleting " + driverName + " temp directory", e); + } + tempDir = null; + } + } + + /** + * Adds authorizations to the {@link SecurityOperations} of this instance's user. + * @param auths the list of authorizations to add. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public void addAuths(String... auths) throws AccumuloException, AccumuloSecurityException { + Authorizations newAuths = AccumuloRyaUtils.addUserAuths(user, secOps, auths); + secOps.changeUserAuthorizations(user, newAuths); + } + + /** + * @return the {@link Authorizations} of this instance's user. + * @throws AccumuloException + * @throws AccumuloSecurityException + */ + public Authorizations getAuths() throws AccumuloException, AccumuloSecurityException { + if (secOps != null) { + return secOps.getUserAuthorizations(user); + } else { + return null; + } + } + + /** + * @return {@code true} if this is a mock instance. {@code false} if this is a MiniAccumuloCluster instance. + */ + public boolean isMock() { + return isMock; + } + + /** + * @return {@code true} to create all the indices associated with a Rya deployment. + * {@code false} otherwise. + */ + public boolean shouldCreateIndices() { + return shouldCreateIndices; + } + + /** + * @return {@code true} if all the tables in the instance should have their + * table permissions set to read only. {@code false} if the table permission are set to write. + */ + public boolean isReadOnly() { + return isReadOnly; + } + + /** + * @return the user name tied to this instance + */ + public String getUser() { + return user; + } + + /** + * @return the password for the user. + */ + public String getPassword() { + return password; + } + + /** + * @return the name of the instance. + */ + public String getInstanceName() { + return instanceName; + } + + /** + * @return the table prefix. + */ + public String getTablePrefix() { + return tablePrefix; + } + + /** + * @return the comma-separated authorization list. + */ + public String getAuth() { + return auth; + } + + /** + * @return the {@link Connector} to this instance. + */ + public Connector getConnector() { + return connector; + } + + /** + * Sets the {@link Connector} to this instance. + * @param connector the {@link Connector}. + */ + public void setConnector(Connector connector) { + this.connector = connector; + } + + /** + * @return the {@link AccumuloRyaDAO}. + */ + public AccumuloRyaDAO getDao() { + return dao; + } + + /** + * @return the {@link SecurityOperations}. + */ + public SecurityOperations getSecOps() { + return secOps; + } + + /** + * @return the {@link AccumuloRdfConfiguration}. + */ + public AccumuloRdfConfiguration getConfig() { + return config; + } + + /** + * @return the {@link MiniAccumuloCluster} for this instance or {@code null} if + * this is a {@link MockInstance}. + */ + public MiniAccumuloCluster getMiniAccumuloCluster() { + return miniAccumuloCluster; + } + + /** + * @return the {@link MockInstance} for this instance or {@code null} if + * this is a {@link MiniAccumuloCluster}. + */ + public MockInstance getMockInstance() { + return mockInstance; + } + + /** + * @return the {@link ZooKeeperInstance} for this instance or {@code null} if + * this is a {@link MockInstance}. + */ + public ZooKeeperInstance getZooKeeperInstance() { + return zooKeeperInstance; + } + + /** + * @return the {@link ZooKeepInstance} or {@link MockInstance}. + */ + public Instance getInstance() { + return instance; + } + + /** + * @return the comma-separated list of zoo keeper host names. + */ + public String getZooKeepers() { + return zooKeepers; + } + + /** + * @return an unmodifiable map of the configuration keys and values. + */ + public Map<String, String> getConfigMap() { + return Collections.unmodifiableMap(configMap); + } + + /** + * @return an unmodifiable list of the table names and indices. + */ + public List<String> getTableList() { + return Collections.unmodifiableList(tableList); + } + + /** + * @return the {@link MiniAccumuloCluster} temporary directory for this instance or {@code null} + * if it's a {@link MockInstance}. + */ + public File getTempDir() { + return tempDir; + } +} \ No newline at end of file